From 0283c00edb797e7f60c50d0e6dda9b62e9b909e9 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Mon, 6 Aug 2018 15:49:04 -0400 Subject: [PATCH] MINIFICPP-592: Update RPG to fall back when cURL is not enable --- libminifi/include/RemoteProcessorGroupPort.h | 28 +++++++++++++---- libminifi/src/RemoteProcessorGroupPort.cpp | 33 +++++++++++++++----- 2 files changed, 48 insertions(+), 13 deletions(-) diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index aece744093..170bb49c48 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -44,20 +44,20 @@ namespace minifi { * and decrements based on its construction. Using RAII we should * never have the concern of thread safety. */ -class RPGLatch{ +class RPGLatch { public: - RPGLatch(bool increment = true){ - static std::atomic latch_count (0); + RPGLatch(bool increment = true) { + static std::atomic latch_count(0); count = &latch_count; if (increment) count++; } - ~RPGLatch(){ + ~RPGLatch() { count--; } - int getCount(){ + int getCount() { return *count; } @@ -80,8 +80,9 @@ class RemoteProcessorGroupPort : public core::Processor { timeout_(0), url_(url), http_enabled_(false), + bypass_rest_api_(false), ssl_service(nullptr), - logger_(logging::LoggerFactory::getLogger()){ + logger_(logging::LoggerFactory::getLogger()) { client_type_ = sitetosite::CLIENT_TYPE::RAW; stream_factory_ = stream_factory; if (uuid != nullptr) { @@ -168,6 +169,19 @@ class RemoteProcessorGroupPort : public core::Processor { protected: + /** + * Non static in case anything is loaded when this object is re-scheduled + */ + bool is_http_disabled() { + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient"); + if (ptr != nullptr) { + delete ptr; + return false; + } else { + return true; + } + } + std::shared_ptr stream_factory_; std::unique_ptr getNextProtocol(bool create); void returnProtocol(std::unique_ptr protocol); @@ -195,6 +209,8 @@ class RemoteProcessorGroupPort : public core::Processor { // http proxy utils::HTTPProxy proxy_; + bool bypass_rest_api_; + sitetosite::CLIENT_TYPE client_type_; // Remote Site2Site Info diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 629075c32a..11db82e5e2 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -66,7 +66,7 @@ std::unique_ptr RemoteProcessorGroupPort::getNextP if (!available_protocols_.try_dequeue(nextProtocol)) { if (create) { // create - if (url_.empty()) { + if (url_.empty() || bypass_rest_api_) { sitetosite::SiteToSiteClientConfiguration config(stream_factory_, std::make_shared(protocol_uuid_, host_, port_, ssl_service != nullptr), this->getInterface(), client_type_); config.setHTTPProxy(this->proxy_); nextProtocol = sitetosite::createClient(config); @@ -164,6 +164,20 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr 0) peer_index_ = 0; } + /** + * If at this point we have no peers and HTTP support is disabled this means + * we must rely on the configured host/port + */ + if (peers_.empty() && is_http_disabled()) { + context->getProperty(hostName.getName(), host_); + + int64_t lvalue; + if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) { + port_ = static_cast(lvalue); + site2site_port_ = port_; + } + bypass_rest_api_ = true; + } // populate the site2site protocol for load balancing between them if (peers_.size() > 0) { auto count = peers_.size(); @@ -183,6 +197,8 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptrlog_trace("Created client, moving into available protocols"); returnProtocol(std::move(nextProtocol)); } + } else { + // we don't have any peers } } @@ -209,7 +225,8 @@ void RemoteProcessorGroupPort::onTrigger(const std::shared_ptrlog_trace("On trigger %s", getUUIDStr()); - if (url_.empty()) { + /* + if (url_.empty() || !curl_enabled_) { if (context->getProperty(hostName.getName(), value) && !value.empty()) { host_ = value; } @@ -222,7 +239,7 @@ void RemoteProcessorGroupPort::onTrigger(const std::shared_ptrgetProperty(portUUID.getName(), value) && !value.empty()) { uuid_parse(value.c_str(), protocol_uuid_); - } + }*/ std::unique_ptr protocol_ = nullptr; try { @@ -257,7 +274,6 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { std::string fullUrl = this->protocol_ + this->host_ + ":" + std::to_string(this->port_) + "/nifi-api/site-to-site"; - this->site2site_port_ = -1; configure_->get(Configure::nifi_rest_api_user_name, this->rest_user_name_); configure_->get(Configure::nifi_rest_api_password, this->rest_password_); @@ -290,9 +306,10 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient"); if (nullptr == client_ptr) { - logger_->log_error("Could not locate HTTPClient. You do not have cURL support!"); + logger_->log_error("Could not locate HTTPClient. You do not have cURL support, defaulting to base configuration!"); return; } + this->site2site_port_ = -1; client = std::unique_ptr(dynamic_cast(client_ptr)); client->initialize("GET", fullUrl.c_str(), ssl_service); if (ssl_service) { @@ -315,7 +332,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { const std::vector &response_body = client->getResponseBody(); if (!response_body.empty()) { std::string controller = std::string(response_body.begin(), response_body.end()); - logger_->log_debug("controller config %s", controller); + logger_->log_trace("controller config %s", controller); rapidjson::Document doc; rapidjson::ParseResult ok = doc.Parse(controller.c_str()); @@ -348,8 +365,10 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { void RemoteProcessorGroupPort::refreshPeerList() { refreshRemoteSite2SiteInfo(); - if (site2site_port_ == -1) + if (site2site_port_ == -1) { + logger_->log_debug("No port configured"); return; + } this->peers_.clear();