From 9abdede04492234abafd8948bc6175dfc9d85e19 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Thu, 18 Oct 2018 20:13:15 -0400 Subject: [PATCH 1/2] MINIFICPP-623: Add trace capabilities to controller and agent MINIFICPP-623: Change test port and controller sizes for OSX MINIFICPP-623: avoid ifdef checks that may or may not exist on platforms -- use cmake check --- CMakeLists.txt | 8 + OPS.md | 95 ++++++++++ README.md | 62 +------ controller/Controller.h | 60 +++++-- controller/MiNiFiController.cpp | 21 ++- extensions/http-curl/tests/C2JstackTest.cpp | 164 +++++++++++++++++ extensions/http-curl/tests/CMakeLists.txt | 1 + libminifi/include/FlowController.h | 2 + libminifi/include/SchedulingAgent.h | 13 +- .../include/core/state/UpdateController.h | 14 +- libminifi/include/utils/BackTrace.h | 169 ++++++++++++++++++ libminifi/include/utils/ThreadPool.h | 47 ++++- libminifi/src/FlowController.cpp | 9 + libminifi/src/Properties.cpp | 6 +- libminifi/src/c2/C2Agent.cpp | 58 ++++-- libminifi/src/c2/ControllerSocketProtocol.cpp | 16 ++ libminifi/src/utils/BackTrace.cpp | 132 ++++++++++++++ libminifi/test/unit/BackTraceTests.cpp | 116 ++++++++++++ libminifi/test/unit/ControllerTests.cpp | 4 + 19 files changed, 891 insertions(+), 106 deletions(-) create mode 100644 OPS.md create mode 100644 extensions/http-curl/tests/C2JstackTest.cpp create mode 100644 libminifi/include/utils/BackTrace.h create mode 100644 libminifi/src/utils/BackTrace.cpp create mode 100644 libminifi/test/unit/BackTraceTests.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 4b0f7a33d5..1c924462b4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -27,6 +27,7 @@ option(SKIP_TESTS "Skips building all tests." OFF) option(PORTABLE "Instructs the compiler to remove architecture specific optimizations" ON) option(USE_SYSTEM_OPENSSL "Instructs the build system to search for and use an SSL library available in the host system" ON) option(OPENSSL_OFF "Disables OpenSSL" OFF) +option(ENABLE_OPS "Enable Operations Tools" ON) option(USE_SYSTEM_UUID "Instructs the build system to search for and use an UUID library available in the host system" OFF) option(USE_SYSTEM_CURL "Instructs the build system to search for and use a cURL library available in the host system" ON) if (WIN32) @@ -39,6 +40,7 @@ option(USE_SYSTEM_BZIP2 "Instructs the build system to search for and use a bzip option(BUILD_ROCKSDB "Instructs the build system to use RocksDB from the third party directory" ON) option(FORCE_WINDOWS "Instructs the build system to force Windows builds when WIN32 is specified" OFF) +include(CheckIncludeFile) include(FeatureSummary) include(ExternalProject) @@ -73,6 +75,12 @@ if(CCACHE_FOUND) message("-- Found ccache: ${CCACHE_FOUND}") endif(CCACHE_FOUND) +# check for exec info before we enable the backtrace features. +CHECK_INCLUDE_FILE("execinfo.h" HAS_EXECINFO) +if (ENABLE_OPS AND HAS_EXECINFO AND NOT WIN32) + add_definitions("-DHAS_EXECINFO=1") +endif() + #### Establish Project Configuration #### # Enable usage of the VERSION specifier include(CheckCXXCompilerFlag) diff --git a/OPS.md b/OPS.md new file mode 100644 index 0000000000..4b5b080333 --- /dev/null +++ b/OPS.md @@ -0,0 +1,95 @@ + + +# Apache NiFi - MiNiFi - Operations Readme. + + +This readme defines operational commands for managaging instances. + +## Table of Contents + +- [Description](#description) +- [Managing](#managing-minifi) + - [Commands](#commands) + +## Description + +Apache NiFi MiNiFI C++ can can be managed through our [C2 protocol](https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal) +or through a local interface called the MiNiFi Controller + +## Managing MiNiFi + +The MiNiFi controller is an executable in the bin directory that can be used to control the MiNiFi C++ agent while it runs -- utilizing the [Command and Control Protocol](https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal). Currently the controller will let you stop subcomponents within a running instance, clear queues, get the status of queues, and update the flow for a warm re-deploy. + +The minificontroller can track a single MiNiFi C++ agent through the use of three options. Port is required. +The hostname is not and will default to localhost. Additionally, controller.socket.local.any.interface allows +you to bind to any address when using localhost. Otherwise, we will bind only to the loopback adapter so only +minificontroller on the local host can control the agent: + + $ controller.socket.host=localhost + $ controller.socket.port=9998 + $ controller.socket.local.any.interface=true/false ( default false) + +These are defined by default to the above values. If the port option is left undefined, the MiNiFi controller +will be disabled in your deployment. + + The executable is stored in the bin directory and is titled minificontroller. Available commands are listed below. + Note that with all commands an immediate response by the agent isn't guaranteed. In all cases the agent assumes the role of validating that a response was received, but execution of said command may take some time depending on a number of factors to include persistent storage type, size of queues, and speed of hardware. + +### Debug + + Agents have the ability to return a list of stacks of currently running threads. The Jstack command provides a list of call stacks + for threads within the agent. This may allow users and maintainers to view stacks of running threads to diagnose issues. + +### Commands + #### Specifying connecting information + + ./minificontroller --host "host name" --port "port" + + * By default these options use those defined in minifi.properties and are not required + + #### Start Command + + ./minificontroller --start "component name" + + #### Stack command + ./minificontroller --jstack + + #### Stop command + ./minificontroller --stop "component name" + + #### List connections command + ./minificontroller --list connections + + #### List components command + ./minificontroller --list components + + #### Clear connection command + ./minificontroller --clear "connection name" + + #### GetSize command + ./minificontroller --getsize "connection name" + + * Returns the size of the connection. The current size along with the max will be reported + + #### Update flow + ./minificontroller --updateflow "config yml" + + *Updates the flow file reference and performs a warm re-deploy. + + #### Get full connection command + ./minificontroller --getfull + + *Provides a list of full connections, if any. diff --git a/README.md b/README.md index ead8dba3d7..542dd6cb00 100644 --- a/README.md +++ b/README.md @@ -23,11 +23,12 @@ MiNiFi is a child project effort of Apache NiFi. This repository is for a nativ - [Getting Started](#getting-started) - [System Requirements](#system-requirements) - [Bootstrapping](#bootstrapping) - - [Building](#building) - [Cleaning](#cleaning) - [Configuring](#configuring) - [Running](#running) - [Deploying](#deploying) + - [Extensions](#extensions) +- [Operations](#operations) - [Issue Tracking](#issue-tracking) - [Documentation](#documentation) - [License](#license) @@ -801,66 +802,13 @@ created within the build directory that contains a manifest of build artifacts. The build identifier will be carried with the deployed binary for the configuration you specify. By default all extensions will be built. -### Managing MiNiFi C++ through the MiNiFi Controller - -The MiNiFi controller is an executable in the bin directory that can be used to control the MiNiFi C++ agent while it runs -- utilizing the [Command and Control Protocol](https://cwiki.apache.org/confluence/display/MINIFI/C2+Design+Proposal). Currently the controller will let you stop subcomponents within a running instance, clear queues, get the status of queues, and update the flow for a warm re-deploy. - -The minificontroller can track a single MiNiFi C++ agent through the use of three options. Port is required. -The hostname is not and will default to localhost. Additionally, controller.socket.local.any.interface allows -you to bind to any address when using localhost. Otherwise, we will bind only to the loopback adapter so only -minificontroller on the local host can control the agent: - - $ controller.socket.host=localhost - $ controller.socket.port=9998 - $ controller.socket.local.any.interface=true/false ( default false) - -These are defined by default to the above values. If the port option is left undefined, the MiNiFi controller -will be disabled in your deployment. - - The executable is stored in the bin directory and is titled minificontroller. Available commands are listed below. - Note that with all commands an immediate response by the agent isn't guaranteed. In all cases the agent assumes the role of validating that a response was received, but execution of said command may take some time depending on a number of factors to include persistent storage type, size of queues, and speed of hardware. - - #### Specifying connecting information - - ./minificontroller --host "host name" --port "port" - - * By default these options use those defined in minifi.properties and are not required - - #### Start Command - - ./minificontroller --start "component name" - - #### Stop command - ./minificontroller --stop "component name" - - #### List connections command - ./minificontroller --list connections - - #### List components command - ./minificontroller --list components - - #### Clear connection command - ./minificontroller --clear "connection name" - - #### GetSize command - ./minificontroller --getsize "connection name" - - * Returns the size of the connection. The current size along with the max will be reported - - #### Update flow - ./minificontroller --updateflow "config yml" - - *Updates the flow file reference and performs a warm re-deploy. - - #### Get full connection command - ./minificontroller --getfull - - *Provides a list of full connections, if any. - ### Extensions Please see [Extensions.md](Extensions.md) on how to build and run conditionally built dependencies and extensions. +## Operations +See our [operations documentation for additional inforomation on how to manage instances](OPS.md) + ## Issue Tracking See https://issues.apache.org/jira/projects/MINIFICPP/issues for the issue tracker. diff --git a/controller/Controller.h b/controller/Controller.h index 312b92201d..bbd099b12a 100644 --- a/controller/Controller.h +++ b/controller/Controller.h @@ -125,6 +125,40 @@ int getFullConnections(std::unique_ptr socket, std::ostream return 0; } +int getJstacks(std::unique_ptr socket, std::ostream &out) { + socket->initialize(); + std::vector data; + uint8_t op = minifi::c2::Operation::DESCRIBE; + minifi::io::BaseStream stream; + stream.writeData(&op, 1); + stream.writeUTF("jstack"); + if (socket->writeData(const_cast(stream.getBuffer()), stream.getSize()) < 0) { + return -1; + } + // read the response + uint8_t resp = 0; + socket->readData(&resp, 1); + if (resp == minifi::c2::Operation::DESCRIBE) { + + uint64_t size = 0; + socket->read(size); + + for (int i = 0; i < size; i++) { + std::string name; + uint64_t lines; + socket->readUTF(name); + socket->read(lines); + for (int j = 0; j < lines; j++) { + std::string line; + socket->readUTF(line); + out << name << " -- " << line << std::endl; + } + + } + } + return 0; +} + /** * Prints the connection size for the provided connection. * @param socket socket ptr @@ -168,7 +202,7 @@ int listComponents(std::unique_ptr socket, std::ostream &out out << "Components:" << std::endl; for (int i = 0; i < responses; i++) { - std::string name,status; + std::string name, status; socket->readUTF(name, false); socket->readUTF(status, false); out << name << ", running: " << status << std::endl; @@ -244,7 +278,7 @@ std::shared_ptr getControllerService(const return service; } - void printManifest(const std::shared_ptr &configuration) { +void printManifest(const std::shared_ptr &configuration) { std::string prov_repo_class = "volatileprovenancerepository"; std::string flow_repo_class = "volatileflowfilerepository"; @@ -252,12 +286,12 @@ std::shared_ptr getControllerService(const std::string content_repo_class = "volatilecontentrepository"; std::shared_ptr log_properties = std::make_shared(); - log_properties->setHome("./"); - log_properties->set("appender.stdout","stdout"); - log_properties->set("logger.org::apache::nifi::minifi","OFF,stdout"); - logging::LoggerConfiguration::getConfiguration().initialize(log_properties); + log_properties->setHome("./"); + log_properties->set("appender.stdout", "stdout"); + log_properties->set("logger.org::apache::nifi::minifi", "OFF,stdout"); + logging::LoggerConfiguration::getConfiguration().initialize(log_properties); - configuration->set(minifi::Configure::nifi_flow_configuration_file,"../conf/config.yml"); + configuration->set(minifi::Configure::nifi_flow_configuration_file, "../conf/config.yml"); configuration->get(minifi::Configure::nifi_provenance_repository_class_name, prov_repo_class); // Create repos for flow record and provenance std::shared_ptr prov_repo = core::createRepository(prov_repo_class, true, "provenance"); @@ -280,11 +314,11 @@ std::shared_ptr getControllerService(const minifi::setDefaultDirectory(content_repo_path); } - configuration->set("c2.agent.heartbeat.period","25"); - configuration->set("nifi.c2.root.classes","AgentInformation"); - configuration->set("nifi.c2.enable","true"); - configuration->set("c2.agent.listen","true"); - configuration->set("c2.agent.heartbeat.reporter.classes","AgentPrinter"); + configuration->set("c2.agent.heartbeat.period", "25"); + configuration->set("nifi.c2.root.classes", "AgentInformation"); + configuration->set("nifi.c2.enable", "true"); + configuration->set("c2.agent.listen", "true"); + configuration->set("c2.agent.heartbeat.reporter.classes", "AgentPrinter"); configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); @@ -293,7 +327,7 @@ std::shared_ptr getControllerService(const std::unique_ptr flow_configuration = core::createFlowConfiguration(prov_repo, flow_repo, content_repo, configuration, stream_factory, nifi_configuration_class_name); std::shared_ptr controller = std::unique_ptr( - new minifi::FlowController(prov_repo, flow_repo, configuration, std::move(flow_configuration), content_repo,"manifest",false)); + new minifi::FlowController(prov_repo, flow_repo, configuration, std::move(flow_configuration), content_repo, "manifest", false)); controller->load(); controller->start(); std::this_thread::sleep_for(std::chrono::milliseconds(10000)); diff --git a/controller/MiNiFiController.cpp b/controller/MiNiFiController.cpp index b3856852b1..06bba2c53d 100644 --- a/controller/MiNiFiController.cpp +++ b/controller/MiNiFiController.cpp @@ -129,6 +129,7 @@ int main(int argc, char **argv) { ("getsize", "Reports the size of the associated connection queue", cxxopts::value>()) //NOLINT ("updateflow", "Updates the flow of the agent using the provided flow file", cxxopts::value()) //NOLINT ("getfull", "Reports a list of full connections") //NOLINT + ("jstack", "Returns backtraces from the agent") //NOLINT ("manifest", "Generates a manifest for the current binary") //NOLINT ("noheaders", "Removes headers from output streams"); @@ -191,13 +192,12 @@ int main(int argc, char **argv) { auto& components = result["c"].as>(); for (const auto& connection : components) { auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port); - if (clearConnection(std::move(socket), connection)){ + if (clearConnection(std::move(socket), connection)) { std::cout << "Sent clear command to " << connection << ". Size before clear operation sent: " << std::endl; socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port); if (getConnectionSize(std::move(socket), std::cout, connection) < 0) - std::cout << "Could not connect to remote host " << host << ":" << port << std::endl; - } - else + std::cout << "Could not connect to remote host " << host << ":" << port << std::endl; + } else std::cout << "Could not connect to remote host " << host << ":" << port << std::endl; } } @@ -231,6 +231,12 @@ int main(int argc, char **argv) { std::cout << "Could not connect to remote host " << host << ":" << port << std::endl; } + if (result.count("jstack") > 0) { + auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port); + if (getJstacks(std::move(socket), std::cout) < 0) + std::cout << "Could not connect to remote host " << host << ":" << port << std::endl; + } + if (result.count("updateflow") > 0) { auto& flow_file = result["updateflow"].as(); auto socket = secure_context != nullptr ? stream_factory_->createSecureSocket(host, port, secure_context) : stream_factory_->createSocket(host, port); @@ -241,10 +247,9 @@ int main(int argc, char **argv) { if (result.count("manifest") > 0) { printManifest(configuration); } - }catch (const std::exception &exc) - { - // catch anything thrown within try block that derives from std::exception - std::cerr << exc.what() << std::endl; + } catch (const std::exception &exc) { + // catch anything thrown within try block that derives from std::exception + std::cerr << exc.what() << std::endl; } catch (...) { std::cout << options.help( { "", "Group" }) << std::endl; exit(0); diff --git a/extensions/http-curl/tests/C2JstackTest.cpp b/extensions/http-curl/tests/C2JstackTest.cpp new file mode 100644 index 0000000000..41413125ce --- /dev/null +++ b/extensions/http-curl/tests/C2JstackTest.cpp @@ -0,0 +1,164 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#undef NDEBUG +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "c2/C2Agent.h" +#include "CivetServer.h" +#include +#include "protocols/RESTSender.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + + +class ConfigHandler : public CivetHandler { + public: + ConfigHandler() { + calls_ = 0; + } + bool handlePost(CivetServer *server, struct mg_connection *conn) { + calls_++; + std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"describe\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"jstack\"" + "}]}"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + heartbeat_response.length()); + mg_printf(conn, "%s", heartbeat_response.c_str()); + + + return true; + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::ifstream myfile(test_file_location_.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); + mg_printf(conn, "%s", str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + std::string test_file_location_; + std::atomic calls_; +}; + +int main(int argc, char **argv) { + mg_init_library(0); + LogTestController::getInstance().setInfo(); + LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setDebug(); + LogTestController::getInstance().setTrace(); + + const char *options[] = { "document_root", ".", "listening_ports", "8727", 0 }; + std::vector cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + + CivetServer server(cpp_options); + ConfigHandler h_ex; + server.addHandler("/update", h_ex); + std::string key_dir, test_file_location; + if (argc > 1) { + h_ex.test_file_location_ = test_file_location = argv[1]; + key_dir = argv[2]; + } + + + std::shared_ptr configuration = std::make_shared(); + + configuration->set("c2.rest.url", "http://localhost:8727/update"); + configuration->set("c2.agent.heartbeat.period", "1000"); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr test_repo = std::make_shared(); + std::shared_ptr test_flow_repo = std::make_shared(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + + std::shared_ptr stream_factory = minifi::io::StreamFactory::getInstance(configuration); + std::shared_ptr content_repo = std::make_shared(); + std::unique_ptr yaml_ptr = std::unique_ptr( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr repo = std::static_pointer_cast(test_repo); + + std::shared_ptr controller = std::make_shared(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, + true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + + std::unique_ptr ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr pg = std::shared_ptr(ptr.get()); + ptr.release(); + auto start = std::chrono::system_clock::now(); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + auto then = std::chrono::system_clock::now(); + + auto milliseconds = std::chrono::duration_cast(then - start).count(); + std::string logs = LogTestController::getInstance().log_output.str(); + #ifndef WIN32 + assert(logs.find("SchedulingAgent") != std::string::npos); + #endif + LogTestController::getInstance().reset(); + rmdir("./content_repository"); + assert(h_ex.calls_ <= (milliseconds / 1000) + 1); + + return 0; +} diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt index 721c2215e7..b8d6c6979a 100644 --- a/extensions/http-curl/tests/CMakeLists.txt +++ b/extensions/http-curl/tests/CMakeLists.txt @@ -71,6 +71,7 @@ message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test fi add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME C2JstackTest COMMAND C2JstackTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") add_test(NAME C2UpdateAgentTest COMMAND C2UpdateAgentTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/TestBad.yml" "${TEST_RESOURCES}/") add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/") diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 0466546afb..9309b4f20d 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -319,6 +319,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi virtual uint64_t getUptime(); + virtual std::vector getTraces(); + void initializeC2(); protected: diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 682f6ec51e..925efdb176 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -29,6 +29,7 @@ #include #include "utils/TimeUtil.h" #include "utils/ThreadPool.h" +#include "utils/BackTrace.h" #include "core/Core.h" #include "core/logging/LoggerConfiguration.h" #include "properties/Configure.h" @@ -90,7 +91,7 @@ class SingleRunMonitor : public TimerAwareMonitor { : TimerAwareMonitor(run_monitor) { } explicit SingleRunMonitor(TimerAwareMonitor &&other) - : TimerAwareMonitor(std::move(other)){ + : TimerAwareMonitor(std::move(other)) { } virtual bool isFinished(const uint64_t &result) { if (result == 0) { @@ -123,7 +124,11 @@ class SchedulingAgent { running_ = false; repo_ = repo; flow_repo_ = flow_repo; - auto pool = utils::ThreadPool(configure_->getInt(Configure::nifi_flow_engine_threads, 2), true, controller_service_provider); + /** + * To facilitate traces we cannot use daemon threads -- this could potentially cause blocking on I/O; however, it's a better path + * to be able to debug why an agent doesn't work and still allow a restart via updates in these cases. + */ + auto pool = utils::ThreadPool(configure_->getInt(Configure::nifi_flow_engine_threads, 2), false, controller_service_provider, "SchedulingAgent"); thread_pool_ = std::move(pool); thread_pool_.start(); } @@ -148,6 +153,10 @@ class SchedulingAgent { thread_pool_.shutdown(); } + std::vector getTraces() { + return thread_pool_.getTraces(); + } + public: virtual std::future enableControllerService(std::shared_ptr &serviceNode); virtual std::future disableControllerService(std::shared_ptr &serviceNode); diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h index fec3cc1240..7cd61c459d 100644 --- a/libminifi/include/core/state/UpdateController.h +++ b/libminifi/include/core/state/UpdateController.h @@ -20,6 +20,7 @@ #include #include "utils/ThreadPool.h" +#include "utils/BackTrace.h" namespace org { namespace apache { @@ -69,9 +70,9 @@ class UpdateStatus { class Update { public: - Update() - : status_(UpdateStatus(UpdateState::INITIATE, 0)) { - } + Update() + : status_(UpdateStatus(UpdateState::INITIATE, 0)) { + } Update(UpdateStatus status) : status_(status) { @@ -235,6 +236,13 @@ class StateMonitor : public StateController { */ virtual uint64_t getUptime() = 0; + /** + * Returns a vector of backtraces + * @return backtraces from the state monitor. + */ + virtual std::vector getTraces() = 0; + + protected: std::atomic controller_running_; }; diff --git a/libminifi/include/utils/BackTrace.h b/libminifi/include/utils/BackTrace.h new file mode 100644 index 0000000000..5c7bb80019 --- /dev/null +++ b/libminifi/include/utils/BackTrace.h @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_UTILS_BACKTRACE_H_ +#define LIBMINIFI_INCLUDE_UTILS_BACKTRACE_H_ + +#ifdef HAS_EXECINFO +#include +#include +#endif +#include +#include +#include +#include +#include + +#define TRACE_BUFFER_SIZE 128 + +/** + * Forward declaration allows us to tightly couple TraceResolver + * with BackTrace. + */ +class TraceResolver; + +/** + * Purpose: Backtrace is a movable vector of trace lines. + * + */ +class BackTrace { + public: + BackTrace() { + } + BackTrace(const std::string &name) + : name_(name) { + } + BackTrace(BackTrace &&) = default; + BackTrace(BackTrace &) = delete; + + std::vector getTraces() const { + return trace_; + } + + BackTrace &operator=(BackTrace &&other) = default; + + /** + * Return thread name of f this caller + * @returns name ; + */ + std::string getName() const { + return name_; + } + + protected: + void addLine(const std::string &symbol_line) { + trace_.emplace_back(symbol_line); + } + + private: + std::string name_; + std::vector trace_; + friend class TraceResolver; +}; + +/** + * Pulls the trace and places it onto the TraceResolver instance. + */ +void pull_trace(const uint8_t frames_to_skip = 1); + +#ifdef HAS_EXECINFO +/** + * Signal handler that will run via TraceResolver + */ +void handler(int signr, siginfo_t *info, void *secret); +#endif +/** + * Emplaces a signal handler for SIGUSR2 + */ +void emplace_handler(); + +/** + * Purpose: Provides a singular instance to grab the call stack for thread(s). + * Design: is a singleton to avoid multiple signal handlers. + */ +class TraceResolver { + public: + + /** + * Retrieves the backtrace for the provided thread reference + * @return BackTrace instance + */ + BackTrace &&getBackTrace(const std::string &thread_name, std::thread::native_handle_type thread); + + /** + * Retrieves the backtrace for the calling thread + * @returns BackTrace instance + */ + BackTrace &&getBackTrace(const std::string &thread_name) { +#ifdef WIN32 + // currrently not supported in windows + return BackTrace(thread_name); +#else + return std::move(getBackTrace(thread_name, pthread_self())); +#endif + } + + /** + * Returns a static instance of the thread resolver. + */ + static TraceResolver &getResolver() { + static TraceResolver resolver; + return resolver; + } + + /** + * Adds a trace line with an optional function + * @param symbol_line symbol line that was produced + * @param func function name + */ + void addTraceLine(const char *symbol_line, const char *func = nullptr) { + std::stringstream line; + line << symbol_line; + if (nullptr != func) { + line << " @" << func; + } + trace_.addLine(line.str()); + } + + /** + * Returns the thread handle reference in the native format. + */ + const std::thread::native_handle_type getThreadHandle() { + return thread_handle_; + } + + /** + * Returns the caller handle refernce in the native format. + */ + const std::thread::native_handle_type getCallerHandle() { + return caller_handle_; + } + + private: + TraceResolver() // can't use = default due to handle_types not defaulting. + : thread_handle_(0), + caller_handle_(0) { + ; + } + + BackTrace trace_; + std::thread::native_handle_type thread_handle_; + std::thread::native_handle_type caller_handle_; + std::mutex mutex_; +}; + +#endif /* LIBMINIFI_INCLUDE_UTILS_BACKTRACE_H_ */ + diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 9fc47f5183..ffb28fef3e 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -18,6 +18,7 @@ #define LIBMINIFI_INCLUDE_THREAD_POOL_H #include +#include #include #include #include @@ -28,6 +29,7 @@ #include #include +#include "BackTrace.h" #include "capi/expect.h" #include "controllers/ThreadManagementService.h" #include "concurrentqueue.h" @@ -189,17 +191,20 @@ std::shared_ptr> Worker::getPromise() { class WorkerThread { public: - explicit WorkerThread(std::thread thread) + explicit WorkerThread(std::thread thread, const std::string &name = "NamelessWorker") : is_running_(false), - thread_(std::move(thread)) { + thread_(std::move(thread)), + name_(name) { } - WorkerThread() - : is_running_(false) { + WorkerThread(const std::string &name = "NamelessWorker") + : is_running_(false), + name_(name) { } std::atomic is_running_; std::thread thread_; + std::string name_; }; /** @@ -212,13 +217,15 @@ template class ThreadPool { public: - ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, const std::shared_ptr &controller_service_provider = nullptr) + ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, const std::shared_ptr &controller_service_provider = nullptr, + const std::string &name = "NamelessPool") : daemon_threads_(daemon_threads), thread_reduction_count_(0), max_worker_threads_(max_worker_threads), adjust_threads_(false), running_(false), - controller_service_provider_(controller_service_provider) { + controller_service_provider_(controller_service_provider), + name_(name) { current_workers_ = 0; task_count_ = 0; thread_manager_ = nullptr; @@ -231,7 +238,8 @@ class ThreadPool { adjust_threads_(false), running_(false), controller_service_provider_(std::move(other.controller_service_provider_)), - thread_manager_(std::move(other.thread_manager_)) { + thread_manager_(std::move(other.thread_manager_)), + name_(std::move(other.name_)) { current_workers_ = 0; task_count_ = 0; } @@ -264,6 +272,22 @@ class ThreadPool { return task_status_[identifier] == true; } + std::vector getTraces() { + std::vector traces; + std::lock_guard lock(manager_mutex_); + std::unique_lock wlock(worker_queue_mutex_); + // while we may be checking if running, we don't want to + // use the threads outside of the manager mutex's lock -- therefore we will + // obtain a lock so we can keep the threads in memory + if (running_) { + for (const auto &worker : thread_queue_) { + if (worker->is_running_) + traces.emplace_back(TraceResolver::getResolver().getBackTrace(worker->name_, worker->thread_.native_handle())); + } + } + return traces; + } + /** * Starts the Thread Pool */ @@ -315,6 +339,8 @@ class ThreadPool { if (!running_) { start(); } + + name_ = other.name_; return *this; } @@ -367,6 +393,8 @@ class ThreadPool { std::recursive_mutex manager_mutex_; // work queue mutex std::mutex worker_queue_mutex_; + // thread pool name + std::string name_; /** * Call for the manager to start worker threads @@ -404,7 +432,9 @@ bool ThreadPool::execute(Worker &&task, std::future &future) { template void ThreadPool::manageWorkers() { for (int i = 0; i < max_worker_threads_; i++) { - auto worker_thread = std::make_shared(); + std::stringstream thread_name; + thread_name << name_ << " #" << i; + auto worker_thread = std::make_shared(thread_name.str()); worker_thread->thread_ = createThread(std::bind(&ThreadPool::run_tasks, this, worker_thread)); thread_queue_.push_back(worker_thread); current_workers_++; @@ -461,6 +491,7 @@ void ThreadPool::manageWorkers() { template void ThreadPool::run_tasks(std::shared_ptr thread) { auto waitperiod = std::chrono::milliseconds(1) * 100; + thread->is_running_ = true; uint64_t wait_decay_ = 0; uint64_t yield_backoff = 10; // start at 10 ms while (running_.load()) { diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 9206f419b3..25b4fc324d 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -910,6 +910,15 @@ uint64_t FlowController::getUptime() { return time_since; } +std::vector FlowController::getTraces() { + std::vector traces; + auto timer_driven = timer_scheduler_->getTraces(); + traces.insert(traces.end(), std::make_move_iterator(timer_driven.begin()), std::make_move_iterator(timer_driven.end())); + auto event_driven = event_scheduler_->getTraces(); + traces.insert(traces.end(), std::make_move_iterator(event_driven.begin()), std::make_move_iterator(event_driven.end())); + return traces; +} + } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp index e64a92f789..c8cb341b5b 100644 --- a/libminifi/src/Properties.cpp +++ b/libminifi/src/Properties.cpp @@ -26,7 +26,7 @@ namespace apache { namespace nifi { namespace minifi { -#define BUFFER_SIZE 512 +#define TRACE_BUFFER_SIZE 512 Properties::Properties() : logger_(logging::LoggerFactory::getLogger()) { @@ -138,8 +138,8 @@ void Properties::loadConfigureFile(const char *fileName) { } this->clear(); - char buf[BUFFER_SIZE]; - for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) { + char buf[TRACE_BUFFER_SIZE]; + for (file.getline(buf, TRACE_BUFFER_SIZE); file.good(); file.getline(buf, TRACE_BUFFER_SIZE)) { parseConfigureFileLine(buf); } } diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index ebb45492cf..8168c77fa6 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -173,21 +173,25 @@ void C2Agent::configure(const std::shared_ptr &configure, bool reconf if (allow_updates_) { if (!configure->get("nifi.c2.agent.update.command", "c2.agent.update.command", update_command_)) { char cwd[1024]; - getcwd(cwd, sizeof(cwd)); + if (getcwd(cwd, sizeof(cwd)) == nullptr) { + logger_->log_error("Could not set update command, reason %s", std::strerror(errno)); - std::stringstream command; - command << cwd << "/minifi.sh update"; - update_command_ = command.str(); + } else { + std::stringstream command; + command << cwd << "/minifi.sh update"; + update_command_ = command.str(); + } } if (!configure->get("nifi.c2.agent.update.temp.location", "c2.agent.update.temp.location", update_location_)) { char cwd[1024]; - getcwd(cwd, sizeof(cwd)); - - std::stringstream copy_path; - std::stringstream command; - - copy_path << cwd << "/minifi.update"; + if (getcwd(cwd, sizeof(cwd)) == nullptr) { + logger_->log_error("Could not set copy path, reason %s", std::strerror(errno)); + } else { + std::stringstream copy_path; + std::stringstream command; + copy_path << cwd << "/minifi.update"; + } } // if not defined we won't beable to update @@ -536,6 +540,31 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) { enqueue_c2_response(std::move(response)); return; + } else if (resp.name == "jstack") { + if (update_sink_->isRunning()) { + const std::vector traces = update_sink_->getTraces(); + for (const auto &trace : traces) { + for (const auto & line : trace.getTraces()) { + logger_->log_trace("%s -- %s", trace.getName(), line); + } + } + auto keys = configuration_->getConfiguredKeys(); + C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); + response.setLabel("configuration_options"); + for (const auto &trace : traces) { + C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true); + options.setLabel(trace.getName()); + std::string value; + for (const auto &line : trace.getTraces()) { + C2ContentResponse option(Operation::ACKNOWLEDGE); + option.name = line; + option.operation_arguments[line] = line; + options.addContent(std::move(option)); + } + response.addPayload(std::move(options)); + } + enqueue_c2_response(std::move(response)); + } } C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); enqueue_c2_response(std::move(response)); @@ -720,14 +749,19 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { void C2Agent::restart_agent() { char cwd[1024]; - getcwd(cwd, sizeof(cwd)); + if (getcwd(cwd, sizeof(cwd)) == nullptr) { + logger_->log_error("Could not restart agent, reason %s", std::strerror(errno)); + return; + } std::stringstream command; command << cwd << "/minifi.sh restart"; } void C2Agent::update_agent() { - system(update_command_.c_str()); + if (!system(update_command_.c_str())) { + logger_->log_warn("May not have command processor"); + } } int16_t C2Agent::setResponseNodes(const std::shared_ptr &metric) { diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp index 2bd6d4d82f..d4f3970600 100644 --- a/libminifi/src/c2/ControllerSocketProtocol.cpp +++ b/libminifi/src/c2/ControllerSocketProtocol.cpp @@ -187,6 +187,22 @@ void ControllerSocketProtocol::initialize(const std::shared_ptrisRunning() ? "true" : "false"); } stream->writeData(const_cast(resp.getBuffer()), resp.getSize()); + } else if (what == "jstack") { + io::BaseStream resp; + resp.writeData(&head, 1); + auto traces = update_sink_->getTraces(); + uint64_t trace_size = traces.size(); + resp.write(trace_size); + for (const auto &trace : traces) { + const auto &lines = trace.getTraces(); + resp.writeUTF(trace.getName()); + uint64_t lsize = lines.size(); + resp.write(lsize); + for (const auto &line : lines) { + resp.writeUTF(line); + } + } + stream->writeData(const_cast(resp.getBuffer()), resp.getSize()); } else if (what == "connections") { io::BaseStream resp; resp.writeData(&head, 1); diff --git a/libminifi/src/utils/BackTrace.cpp b/libminifi/src/utils/BackTrace.cpp new file mode 100644 index 0000000000..160a0701ac --- /dev/null +++ b/libminifi/src/utils/BackTrace.cpp @@ -0,0 +1,132 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "utils/BackTrace.h" +#ifdef HAS_EXECINFO +#include +#include +#include +#endif +#define NAME_SIZE 256 + +void pull_trace(const uint8_t frames_to_skip) { +#ifdef HAS_EXECINFO + void *stackBuffer[TRACE_BUFFER_SIZE + 1]; + + // retrieve current stack addresses + int trace_size = backtrace(stackBuffer, TRACE_BUFFER_SIZE); + + char **symboltable = backtrace_symbols(stackBuffer, trace_size); + /** + * we can skip the signal handler, call to pull_trace, and the first entry for backtrace_symbols + */ + for (int i = frames_to_skip; i < trace_size; i++) { + char *start_parenthetical = 0; + char *functor = 0; + char *stop_parenthetical = 0; + + for (char *p = symboltable[i]; *p; ++p) { + if (*p == '(') { + start_parenthetical = p; + } else if (*p == '+') { + functor = p; + } else if (*p == ')' && functor) { + stop_parenthetical = p; + break; + } + } + bool hasFunc = start_parenthetical && functor && stop_parenthetical; + if (hasFunc && start_parenthetical < functor) { + *start_parenthetical++ = '\0'; + *functor++ = '\0'; + *stop_parenthetical = '\0'; + + /** + * Demangle the names -- this requires calling cxx api to demangle the function name. + * not sending an allocated buffer, so we'll deallocate if status is zero. + */ + + int status; + + auto demangled = abi::__cxa_demangle(start_parenthetical, nullptr, nullptr, &status); + if (status == 0) { + TraceResolver::getResolver().addTraceLine(symboltable[i], demangled); + free(demangled); + } else { + TraceResolver::getResolver().addTraceLine(symboltable[i], start_parenthetical); + } + } else { + TraceResolver::getResolver().addTraceLine(symboltable[i], ""); + } + } + + free(symboltable); +#endif +} + +BackTrace &&TraceResolver::getBackTrace(const std::string &thread_name, std::thread::native_handle_type thread_handle) { + // lock so that we only perform one backtrace at a time. +#ifdef HAS_EXECINFO + std::lock_guard lock(mutex_); + + caller_handle_ = pthread_self(); + thread_handle_ = thread_handle; + trace_ = BackTrace(thread_name); + + if (0 == thread_handle_ || pthread_equal(caller_handle_, thread_handle)) { + pull_trace(); + } else { + if (thread_handle_ == 0) { + return std::move(trace_); + } + emplace_handler(); + if (pthread_kill(thread_handle_, SIGUSR2) != 0) { + return std::move(trace_); + } + sigset_t mask; + sigfillset(&mask); + sigdelset(&mask, SIGUSR2); + sigsuspend(&mask); + } +#else + // even if tracing is disabled, include thread name into the trace object + trace_ = BackTrace(thread_name); +#endif + return std::move(trace_); +} +#ifdef HAS_EXECINFO +void handler(int signr, siginfo_t *info, void *secret) { + auto curThread = pthread_self(); + + // not the intended thread + if (!pthread_equal(curThread, TraceResolver::getResolver().getThreadHandle())) { + return; + } + + pull_trace(); + + pthread_kill(TraceResolver::getResolver().getCallerHandle(), SIGUSR2); +} +#endif + +void emplace_handler() { +#ifdef HAS_EXECINFO + struct sigaction sa; + sigfillset(&sa.sa_mask); + sa.sa_flags = SA_SIGINFO; + sa.sa_sigaction = handler; + sigaction(SIGUSR2, &sa, NULL); +#endif +} diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp new file mode 100644 index 0000000000..816ff636b0 --- /dev/null +++ b/libminifi/test/unit/BackTraceTests.cpp @@ -0,0 +1,116 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "../TestBase.h" +#include "utils/BackTrace.h" + +bool function() { + return true; +} + +class WorkerNumberExecutions : public utils::AfterExecute { + public: + explicit WorkerNumberExecutions(int tasks) + : runs(0), + tasks(tasks) { + } + + explicit WorkerNumberExecutions(WorkerNumberExecutions && other) + : runs(std::move(other.runs)), + tasks(std::move(other.tasks)) { + } + + ~WorkerNumberExecutions() { + } + + virtual bool isFinished(const int &result) { + if (result > 0 && ++runs < tasks) { + return false; + } else { + return true; + } + } + virtual bool isCancelled(const int &result) { + return false; + } + + int getRuns() { + return runs; + } + + virtual int64_t wait_time() { + // wait 50ms + return 50; + } + + protected: + int runs; + int tasks; +}; + +TEST_CASE("BT1", "[TPT1]") { + const BackTrace trace = TraceResolver::getResolver().getBackTrace("BT1"); +#ifdef HAS_EXECINFO + REQUIRE(!trace.getTraces().empty()); +#endif +} + +std::atomic counter; + +int counterFunction() { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + return ++counter; +} + +TEST_CASE("BT2", "[TPT2]") { + counter = 0; + utils::ThreadPool pool(4); + pool.start(); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + for (int i = 0; i < 3; i++) { + std::function f_ex = counterFunction; + std::unique_ptr> after_execute = std::unique_ptr>(new WorkerNumberExecutions(5)); + utils::Worker functor(f_ex, "id", std::move(after_execute)); + + std::future fut; + REQUIRE(true == pool.execute(std::move(functor), fut)); + } + + std::function f_ex = counterFunction; + std::unique_ptr> after_execute = std::unique_ptr>(new WorkerNumberExecutions(5)); + utils::Worker functor(f_ex, "id", std::move(after_execute)); + + std::future fut; + REQUIRE(true == pool.execute(std::move(functor), fut)); + + std::vector traces = pool.getTraces(); + for (const auto &trace : traces) { + const auto &trace_strings = trace.getTraces(); +#ifdef HAS_EXECINFO + REQUIRE(trace_strings.size() > 2); + if (trace_strings.at(0).find("sleep_for") != std::string::npos) { + REQUIRE(trace_strings.at(1).find("counterFunction") != std::string::npos); + } +#endif + } + fut.wait(); +} + diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp index c5268ab41a..0e755664fc 100644 --- a/libminifi/test/unit/ControllerTests.cpp +++ b/libminifi/test/unit/ControllerTests.cpp @@ -110,6 +110,10 @@ class TestUpdateSink : public minifi::state::StateMonitor { virtual int16_t pause() { return 0; } + virtual std::vector getTraces() { + std::vector traces; + return traces; + } /** * Operational controllers From 2cf37798b4cfcf8e29824788596c4420397e8181 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Mon, 22 Oct 2018 07:08:32 -0400 Subject: [PATCH 2/2] MINIFICPP-623: Clarify Readme --- OPS.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/OPS.md b/OPS.md index 4b5b080333..36a948f61f 100644 --- a/OPS.md +++ b/OPS.md @@ -51,7 +51,9 @@ will be disabled in your deployment. ### Debug Agents have the ability to return a list of stacks of currently running threads. The Jstack command provides a list of call stacks - for threads within the agent. This may allow users and maintainers to view stacks of running threads to diagnose issues. + for threads within the agent. This may allow users and maintainers to view stacks of running threads to diagnose issues. The name + is an homage to the jstack command used by Java developers. The design is fundamentally the same as that of Java -- signal handlers + notify signals to interrupt and provide traces. This feature is currently not built into Windows builds. ### Commands #### Specifying connecting information @@ -66,7 +68,7 @@ will be disabled in your deployment. #### Stack command ./minificontroller --jstack - + #### Stop command ./minificontroller --stop "component name"