Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions Framework/include/QualityControl/ServiceDiscovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
#include <string>
#include <thread>
#include <boost/asio/ip/host_name.hpp>
#include <random>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/io_service.hpp>
#include "QualityControl/QcInfoLogger.h"

namespace o2::quality_control::core
{
Expand All @@ -39,7 +43,7 @@ class ServiceDiscovery
/// \param id Unique instance ID
/// \param healthEndpoint Local endpoint that is then used for health checks
/// (default value it set to <hostname>:7777)
ServiceDiscovery(const std::string& url, const std::string& name, const std::string& id, const std::string& healthEndpoint = GetDefaultUrl());
ServiceDiscovery(const std::string& url, const std::string& name, const std::string& id, const std::string& healthEndUrl = GetDefaultUrl());

/// Stops the health thread and deregisteres from Consul health checks
~ServiceDiscovery();
Expand All @@ -51,17 +55,60 @@ class ServiceDiscovery
/// Deregisters service
void deregister();

// https://stackoverflow.com/questions/33358321/using-c-and-boost-or-not-to-check-if-a-specific-port-is-being-used
static inline bool PortInUse(unsigned short port)
{
using namespace boost::asio;
using ip::tcp;

boost::asio::io_service svc;
tcp::acceptor a(svc);

boost::system::error_code ec;
a.open(tcp::v4(), ec) || a.bind({ tcp::v4(), port }, ec);

return ec == error::address_in_use;
}

static inline std::string GetDefaultUrl() ///< Provides default health check URL
{
return GetDefaultUrl(DefaultHealthPort);
return GetDefaultUrl(GetHealthPort());
}

/// Find a free port in the range [HealthPortRangeEnd;HealthPortRangeStart] if any available.
static inline size_t GetHealthPort()
{
// inspired by https://stackoverflow.com/questions/7560114/random-number-c-in-some-range/7560151
size_t port;
std::random_device rd; // obtain a random number from hardware
std::mt19937 gen(rd()); // seed the generator
size_t rangeLength = HealthPortRangeEnd - HealthPortRangeStart + 1;
std::uniform_int_distribution<> distr(0, rangeLength - 1); // define the inclusive range

size_t index = distr(gen); // get a random index in the range
port = HealthPortRangeStart + index;
size_t cycle = 1; // count how many ports we tried
while (cycle < rangeLength && PortInUse(port)) { // if the port is in use and we did not go through the whole range
index = (index + 1) % rangeLength; // pick the next index
port = HealthPortRangeStart + index;
cycle++;
}
if (cycle == rangeLength) {
ILOG(Error, Support) << "Could not find a free port for the ServiceDiscovery" << ENDM;
// we keep the last port but all calls will fail.
} else {
ILOG(Debug, Devel) << "ServiceDiscovery selected port: " << port << ENDM;
}
return port;
}

static inline std::string GetDefaultUrl(size_t port) ///< Provides default health check URL
{
return boost::asio::ip::host_name() + ":" + std::to_string(port);
}

static constexpr size_t DefaultHealthPort = 7777; ///< Health check default port
static constexpr size_t HealthPortRangeStart = 47800; ///< Health check port range start
static constexpr size_t HealthPortRangeEnd = 47899; ///< Health check port range end

private:
/// Custom deleter of CURL object
Expand All @@ -73,7 +120,7 @@ class ServiceDiscovery
const std::string mConsulUrl; ///< Consul URL
const std::string mName; ///< Instance (service) Name
const std::string mId; ///< Instance (service) ID
std::string mHealthEndpoint; ///< hostname and port of health check endpoint
std::string mHealthUrl; ///< hostname and port of health check endpoint
std::thread mHealthThread; ///< Health check thread
std::atomic<bool> mThreadRunning; ///< Health check thread running flag

Expand Down
3 changes: 1 addition & 2 deletions Framework/src/AggregatorRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ void AggregatorRunner::initServiceDiscovery()
ILOG(Warning, Ops) << "Service Discovery disabled" << ENDM;
return;
}
std::string url = ServiceDiscovery::GetDefaultUrl(ServiceDiscovery::DefaultHealthPort + 2); // we try to avoid colliding with the CheckRunner
mServiceDiscovery = std::make_shared<ServiceDiscovery>(consulUrl, mDeviceName, mDeviceName, url);
mServiceDiscovery = std::make_shared<ServiceDiscovery>(consulUrl, mDeviceName, mDeviceName);
ILOG(Info, Devel) << "ServiceDiscovery initialized";
}

Expand Down
3 changes: 1 addition & 2 deletions Framework/src/CheckRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,7 @@ void CheckRunner::initServiceDiscovery()
ILOG(Warning, Ops) << "Service Discovery disabled" << ENDM;
return;
}
std::string url = ServiceDiscovery::GetDefaultUrl(ServiceDiscovery::DefaultHealthPort + 1); // we try to avoid colliding with the TaskRunner
mServiceDiscovery = std::make_shared<ServiceDiscovery>(mConfig.consulUrl, mDeviceName, mDeviceName, url);
mServiceDiscovery = std::make_shared<ServiceDiscovery>(mConfig.consulUrl, mDeviceName, mDeviceName);
ILOG(Info, Support) << "ServiceDiscovery initialized" << ENDM;
}

Expand Down
13 changes: 6 additions & 7 deletions Framework/src/ServiceDiscovery.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,22 @@
#include "QualityControl/QcInfoLogger.h"
#include <string>
#include <boost/asio.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string.hpp>

namespace o2::quality_control::core
{

ServiceDiscovery::ServiceDiscovery(const std::string& url, const std::string& name, const std::string& id, const std::string& healthEndpoint)
: curlHandle(initCurl(), &ServiceDiscovery::deleteCurl), mConsulUrl(url), mName(name), mId(id), mHealthEndpoint(healthEndpoint)
ServiceDiscovery::ServiceDiscovery(const std::string& url, const std::string& name, const std::string& id, const std::string& healthEndUrl)
: curlHandle(initCurl(), &ServiceDiscovery::deleteCurl), mConsulUrl(url), mName(name), mId(id), mHealthUrl(healthEndUrl)
{
// parameter check
if (mHealthEndpoint.find(':') == std::string::npos) {
mHealthEndpoint = GetDefaultUrl();
if (mHealthUrl.find(':') == std::string::npos) {
mHealthUrl = GetDefaultUrl();
}

mHealthThread = std::thread([=] { runHealthServer(std::stoi(mHealthEndpoint.substr(mHealthEndpoint.find(":") + 1))); });
mHealthThread = std::thread([=] { runHealthServer(std::stoi(mHealthUrl.substr(mHealthUrl.find(":") + 1))); });
_register("");
}

Expand Down Expand Up @@ -81,7 +80,7 @@ void ServiceDiscovery::_register(const std::string& objects)
check.put("Name", "Health check " + mId);
check.put("Interval", "5s");
check.put("DeregisterCriticalServiceAfter", "1m");
check.put("TCP", mHealthEndpoint);
check.put("TCP", mHealthUrl);
checks.push_back(std::make_pair("", check));

pt.put("Name", mName);
Expand Down