Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Framework/advanced.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
"Activity": {
"number": "42",
"type": "2"
},
"monitoring": {
"url": "infologger:///debug?qc"
},
"consul": {
"url": "http://consul-test.cern.ch:8500"
}
},
"tasks": {
Expand Down
6 changes: 6 additions & 0 deletions Framework/basic-no-sampling.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
"Activity": {
"number": "42",
"type": "2"
},
"monitoring": {
"url": "infologger:///debug?qc"
},
"consul": {
"url": "http://consul-test.cern.ch:8500"
}
},
"tasks": {
Expand Down
6 changes: 6 additions & 0 deletions Framework/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
"Activity": {
"number": "42",
"type": "2"
},
"monitoring": {
"url": "infologger:///debug?qc"
},
"consul": {
"url": "http://consul-test.cern.ch:8500"
}
},
"tasks": {
Expand Down
6 changes: 6 additions & 0 deletions Framework/example-default.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
"Activity": {
"number": "42",
"type": "2"
},
"monitoring": {
"url": "infologger:///debug?qc"
},
"consul": {
"url": "http://consul-test.cern.ch:8500"
}
},
"tasks": {
Expand Down
6 changes: 3 additions & 3 deletions Framework/include/QualityControl/ObjectsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ObjectsManager
friend class TaskControl; // TaskControl must be able to call "publish()" whenever needed. Nobody else can.

public:
ObjectsManager(TaskConfig& taskConfig, std::shared_ptr<ServiceDiscovery> serviceDiscovery = nullptr);
ObjectsManager(TaskConfig& taskConfig);
virtual ~ObjectsManager();

/**
Expand Down Expand Up @@ -120,8 +120,8 @@ class ObjectsManager

private:
TObjArray mMonitorObjects;
std::string mTaskName;
std::shared_ptr<ServiceDiscovery> mServiceDiscovery;
TaskConfig& mTaskConfig;
std::unique_ptr<ServiceDiscovery> mServiceDiscovery;
bool mUpdateServiceDiscovery;
};

Expand Down
3 changes: 1 addition & 2 deletions Framework/include/QualityControl/TaskConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ namespace o2::quality_control::core
{

/// \brief Container for the configuration of a Task
///
/// \author Barthelemy von Haller
struct TaskConfig {
std::string taskName;
std::string moduleName;
std::string className;
int cycleDurationSeconds;
int maxNumberCycles;
std::string consulUrl;
};

} // namespace o2::quality_control::core
Expand Down
1 change: 0 additions & 1 deletion Framework/include/QualityControl/TaskRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ class TaskRunner : public framework::Task
std::shared_ptr<TaskInterface> mTask;
bool mResetAfterPublish;
std::shared_ptr<ObjectsManager> mObjectsManager;
std::shared_ptr<ServiceDiscovery> mServiceDiscovery;

// consider moving these to TaskConfig
framework::Inputs mInputSpecs;
Expand Down
6 changes: 6 additions & 0 deletions Framework/readout-no-sampling.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
"Activity": {
"number": "42",
"type": "2"
},
"monitoring": {
"url": "infologger:///debug?qc"
},
"consul": {
"url": "http://consul-test.cern.ch:8500"
}
},
"tasks": {
Expand Down
5 changes: 4 additions & 1 deletion Framework/readout.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
"type": "2"
},
"monitoring": {
"url": "infologger:///debug?qc"
"url": "infologger:///debug?qc"
},
"consul": {
"url": "http://consul-test.cern.ch:8500"
}
},
"tasks": {
Expand Down
7 changes: 5 additions & 2 deletions Framework/src/ObjectsManager.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ using namespace std;
namespace o2::quality_control::core
{

ObjectsManager::ObjectsManager(TaskConfig& taskConfig, std::shared_ptr<ServiceDiscovery> serviceDiscovery) : mTaskName(taskConfig.taskName), mServiceDiscovery(serviceDiscovery), mUpdateServiceDiscovery(false)
ObjectsManager::ObjectsManager(TaskConfig& taskConfig) : mTaskConfig(taskConfig), mUpdateServiceDiscovery(false)
{
mMonitorObjects.SetOwner(true);

// register with the discovery service
mServiceDiscovery = std::make_unique<ServiceDiscovery>(taskConfig.consulUrl, taskConfig.taskName);
}

ObjectsManager::~ObjectsManager()
Expand All @@ -40,7 +43,7 @@ void ObjectsManager::startPublishing(TObject* object)
<< infologger::endm;
BOOST_THROW_EXCEPTION(DuplicateObjectError() << errinfo_object_name(object->GetName()));
}
auto* newObject = new MonitorObject(object, mTaskName);
auto* newObject = new MonitorObject(object, mTaskConfig.taskName);
newObject->setIsOwner(false);
mMonitorObjects.Add(newObject);
mUpdateServiceDiscovery = true;
Expand Down
23 changes: 18 additions & 5 deletions Framework/src/ServiceDiscovery.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ ServiceDiscovery::ServiceDiscovery(const std::string& url, const std::string& id
ServiceDiscovery::~ServiceDiscovery()
{
mThreadRunning = false;
mHealthThread.join();
if (mHealthThread.joinable()) {
mHealthThread.join();
}
deregister();
}

Expand Down Expand Up @@ -102,12 +104,23 @@ void ServiceDiscovery::runHealthServer(unsigned int port)
try {
boost::asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port));
boost::asio::deadline_timer timer(io_service);
while (mThreadRunning) {
io_service.reset();
timer.expires_from_now(boost::posix_time::seconds(1));
timer.async_wait([&](boost::system::error_code ec) {
if (!ec)
acceptor.cancel();
});
tcp::socket socket(io_service);
acceptor.accept(socket);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
acceptor.async_accept(socket, [&](boost::system::error_code ec) {
if (!ec)
timer.cancel();
});
std::this_thread::sleep_for(std::chrono::seconds(1));
}
} catch (std::exception& e) {
mThreadRunning = false;
std::cerr << e.what() << std::endl;
}
}
Expand All @@ -129,10 +142,10 @@ void ServiceDiscovery::send(const std::string& path, std::string&& post)
response = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &responseCode);
if (response != CURLE_OK) {
std::cerr << curl_easy_strerror(response) << std::endl;
std::cerr << "ServiceDiscovery: " << curl_easy_strerror(response) << ": " << uri << std::endl;
}
if (responseCode < 200 || responseCode > 206) {
std::cerr << "Response code : " << responseCode << std::endl;
std::cerr << "ServiceDiscovery: Response code: " << responseCode << std::endl;
}
}
} // namespace o2::quality_control::core
8 changes: 2 additions & 6 deletions Framework/src/TaskRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ TaskRunner::TaskRunner(const std::string& taskName, const std::string& configura
: mDeviceName(createTaskRunnerIdString() + "-" + taskName),
mTask(nullptr),
mResetAfterPublish(false),
mServiceDiscovery(std::make_shared<ServiceDiscovery>("http://consul-test.cern.ch:8500", taskName, "")),
mMonitorObjectsSpec({ "mo" }, createTaskDataOrigin(), createTaskDataDescription(taskName), id),
mNumberBlocks(0),
mLastNumberObjects(0),
Expand All @@ -57,10 +56,6 @@ TaskRunner::TaskRunner(const std::string& taskName, const std::string& configura
// setup configuration
mConfigFile = ConfigurationFactory::getConfiguration(configurationSource);
populateConfig(taskName);

// register with the discovery service
mServiceDiscovery = std::make_unique<ServiceDiscovery>("http://consul-test.cern.ch:8500", mTaskConfig.taskName, "");
mServiceDiscovery->_register("obj1,obj2,obj3");
}

TaskRunner::~TaskRunner() = default;
Expand All @@ -80,7 +75,7 @@ void TaskRunner::init(InitContext& iCtx)
mCollector->enableProcessMonitoring();

// setup publisher
mObjectsManager = std::make_shared<ObjectsManager>(mTaskConfig, mServiceDiscovery);
mObjectsManager = std::make_shared<ObjectsManager>(mTaskConfig);

// setup user's task
TaskFactory f;
Expand Down Expand Up @@ -257,6 +252,7 @@ void TaskRunner::populateConfig(std::string taskName)
mTaskConfig.className = taskConfigTree->second.get<std::string>("className");
mTaskConfig.cycleDurationSeconds = taskConfigTree->second.get<int>("cycleDurationSeconds", 10);
mTaskConfig.maxNumberCycles = taskConfigTree->second.get<int>("maxNumberCycles", -1);
mTaskConfig.consulUrl = mConfigFile->get<std::string>("qc.config.consul.url", "http://consul-test.cern.ch:8500");

auto policiesFilePath = mConfigFile->get<std::string>("dataSamplingPolicyFile", "");
ConfigurationInterface* config = policiesFilePath.empty() ? mConfigFile.get() : ConfigurationFactory::getConfiguration(policiesFilePath).get();
Expand Down
14 changes: 7 additions & 7 deletions Framework/test/testDbFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ BOOST_AUTO_TEST_CASE(db_ccdb_listing)

// test getting objects list from task
auto objectNames = ccdb->getPublishedObjectNames("functional_test");
// cout << "objects in task functional_test" << endl;
// for (auto name : objectNames) {
// cout << " - object : " << name << endl;
// }
BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "object1") != objectNames.end());
BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "object2") != objectNames.end());
BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "path/to/object3") != objectNames.end());
// cout << "objects in task functional_test" << endl;
// for (auto name : objectNames) {
// cout << " - object : " << name << endl;
// }
BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "/object1") != objectNames.end());
BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "/object2") != objectNames.end());
BOOST_CHECK(std::find(objectNames.begin(), objectNames.end(), "/path\\/to\\/object3") != objectNames.end());

// store list of streamer infos
// ccdb->storeStreamerInfosToFile("streamerinfos.root");
Expand Down
1 change: 1 addition & 0 deletions Framework/test/testInfrastructureGenerator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ using namespace o2::framework;

BOOST_AUTO_TEST_CASE(qc_factory_local_test)
{
BOOST_REQUIRE_NE(getenv("QUALITYCONTROL_ROOT"), nullptr);
std::string configFilePath = std::string("json:/") + getenv("QUALITYCONTROL_ROOT") + "/tests/testQCFactory.json";

{
Expand Down
9 changes: 9 additions & 0 deletions Framework/test/testObjectsManager.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,19 @@ using namespace AliceO2::Common;
namespace o2::quality_control::core
{

BOOST_AUTO_TEST_CASE(invalid_url_test)
{
TaskConfig config;
config.taskName = "test";
config.consulUrl = "bad-url:1234";
ObjectsManager objectsManager(config);
}

BOOST_AUTO_TEST_CASE(duplicate_object_test)
{
TaskConfig config;
config.taskName = "test";
config.consulUrl = "http://consul-test.cern.ch:8500";
ObjectsManager objectsManager(config);
TObjString s("content");
objectsManager.startPublishing(&s);
Expand Down