From b4da2d74ab620813d2f3b04120577fef45cc0005 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Thu, 21 Dec 2017 19:59:29 -0500 Subject: [PATCH 1/2] MINIFICPP-355: Resolve issue with 32-bit systems using strtol converting to long long. Change log to INFO by default Disable C2 services to we don't consume unnecessary threads. Reduce logging verbosity in cases where we are printing debug messages to info. Resolve spurious test failure because of port collision --- conf/minifi-log.properties | 2 +- conf/minifi.properties | 2 + libminifi/include/core/Property.h | 6 +- libminifi/src/core/ConfigurableComponent.cpp | 8 +-- libminifi/src/io/ClientSocket.cpp | 14 ++-- libminifi/src/sitetosite/SiteToSiteClient.cpp | 72 +++++++++---------- libminifi/test/curl-tests/CMakeLists.txt | 2 +- libminifi/test/resources/ThreadPoolAdjust.yml | 4 +- 8 files changed, 56 insertions(+), 54 deletions(-) diff --git a/conf/minifi-log.properties b/conf/minifi-log.properties index 99caa1bbdb..dc25406158 100644 --- a/conf/minifi-log.properties +++ b/conf/minifi-log.properties @@ -36,7 +36,7 @@ appender.rolling.max_file_size=5242880 logger.root=INFO,rolling #Logging configurable by namespace -logger.org::apache::nifi::minifi=DEBUG,rolling +logger.org::apache::nifi::minifi=INFO,rolling #Logging configurable by class fully qualified name #logger.org::apache::nifi::minifi::core::logging::LoggerConfiguration=DEBUG diff --git a/conf/minifi.properties b/conf/minifi.properties index 8e71818875..6c75305409 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -15,6 +15,8 @@ # Core Properties # nifi.version=0.1.0 +#disable the c2 services +nifi.c2.enable=false nifi.flow.configuration.file=./conf/config.yml nifi.administrative.yield.duration=30 sec # If a component has no work to do (is "bored"), how long should we wait before checking again for work? diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h index 806f870db8..a8b5c8650d 100644 --- a/libminifi/include/core/Property.h +++ b/libminifi/include/core/Property.h @@ -172,7 +172,7 @@ class Property { const char *cvalue = input.c_str(); char *pEnd; - long int ival = strtol(cvalue, &pEnd, 0); + auto ival = std::strtoll(cvalue, &pEnd, 0); if (pEnd[0] == '\0') { return false; @@ -226,7 +226,7 @@ class Property { const char *cvalue = input.c_str(); char *pEnd; - long int ival = strtol(cvalue, &pEnd, 0); + auto ival = std::strtoll(cvalue, &pEnd, 0); if (pEnd[0] == '\0') { return false; @@ -281,7 +281,7 @@ class Property { const char *cvalue = input.c_str(); char *pEnd; - long int ival = strtol(cvalue, &pEnd, 0); + auto ival = std::strtoll(cvalue, &pEnd, 0); if (pEnd[0] == '\0') { output = ival; diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp index b9a3a793a6..0273aa2e57 100644 --- a/libminifi/src/core/ConfigurableComponent.cpp +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -68,7 +68,7 @@ bool ConfigurableComponent::getProperty(const std::string name, std::string &val if (it != properties_.end()) { Property item = it->second; value = item.getValue(); - logger_->log_info("Component %s property name %s value %s", name, item.getName(), value); + logger_->log_debug("Component %s property name %s value %s", name, item.getName(), value); return true; } else { return false; @@ -88,7 +88,7 @@ bool ConfigurableComponent::setProperty(const std::string name, std::string valu Property item = it->second; item.setValue(value); properties_[item.getName()] = item; - logger_->log_info("Component %s property name %s value %s", name.c_str(), item.getName().c_str(), value.c_str()); + logger_->log_debug("Component %s property name %s value %s", name.c_str(), item.getName().c_str(), value.c_str()); return true; } else { return false; @@ -109,7 +109,7 @@ bool ConfigurableComponent::updateProperty(const std::string &name, const std::s Property item = it->second; item.addValue(value); properties_[item.getName()] = item; - logger_->log_info("Component %s property name %s value %s", name.c_str(), item.getName().c_str(), value.c_str()); + logger_->log_debug("Component %s property name %s value %s", name.c_str(), item.getName().c_str(), value.c_str()); return true; } else { return false; @@ -130,7 +130,7 @@ bool ConfigurableComponent::setProperty(Property &prop, std::string value) { Property item = it->second; item.setValue(value); properties_[item.getName()] = item; - logger_->log_info("property name %s value %s", prop.getName().c_str(), item.getName().c_str(), value.c_str()); + logger_->log_debug("property name %s value %s", prop.getName().c_str(), item.getName().c_str(), value.c_str()); return true; } else { Property newProp(prop); diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index c5202497d2..ac1690e6de 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -92,7 +92,7 @@ void Socket::setNonBlocking() { // handle error logger_->log_error("Could not create non blocking to socket", strerror(errno)); } else { - logger_->log_info("Successfully applied O_NONBLOCK to fd"); + logger_->log_debug("Successfully applied O_NONBLOCK to fd"); } } } @@ -144,7 +144,7 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { // add the listener to the total set FD_SET(socket_file_descriptor_, &total_list_); socket_max_ = socket_file_descriptor_; - logger_->log_info("Created connection with file descriptor %d", socket_file_descriptor_); + logger_->log_debug("Created connection with file descriptor %d", socket_file_descriptor_); return 0; } @@ -193,13 +193,13 @@ int16_t Socket::initialize() { } // we've successfully connected if (port_ > 0 && createConnection(p, addr) >= 0) { - logger_->log_info("Successfully created connection"); + logger_->log_debug("Successfully created connection"); return 0; break; } } - logger_->log_info("Could not find device for our connection"); + logger_->log_debug("Could not find device for our connection"); return -1; } @@ -390,15 +390,15 @@ int Socket::readData(uint8_t *buf, int buflen, bool retrieve_all_bytes) { while (buflen) { int16_t fd = select_descriptor(1000); if (fd < 0) { - logger_->log_info("fd %d close %i", fd, buflen); + logger_->log_debug("fd %d close %i", fd, buflen); close(socket_file_descriptor_); return -1; } int bytes_read = recv(fd, buf, buflen, 0); - logger_->log_info("Recv call %d", bytes_read); + logger_->log_trace("Recv call %d", bytes_read); if (bytes_read <= 0) { if (bytes_read == 0) { - logger_->log_info("Other side hung up on %d", fd); + logger_->log_debug("Other side hung up on %d", fd); } else { if (errno == EAGAIN || errno == EWOULDBLOCK) { // continue diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp index 7094fd3b44..491e78b431 100644 --- a/libminifi/src/sitetosite/SiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp @@ -99,7 +99,7 @@ void SiteToSiteClient::deleteTransaction(std::string transactionID) { transaction = it->second; } - logger_->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str()); + logger_->log_debug("Site2Site delete transaction %s", transaction->getUUIDStr().c_str()); known_transactions_.erase(transactionID); } @@ -135,7 +135,7 @@ int SiteToSiteClient::writeResponse(const std::shared_ptr &transact void SiteToSiteClient::tearDown() { if (peer_state_ >= ESTABLISHED) { - logger_->log_info("Site2Site Protocol tearDown"); + logger_->log_debug("Site2Site Protocol tearDown"); // need to write shutdown request writeRequestType(SHUTDOWN); } @@ -188,7 +188,7 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptrlog_info("Site2Site transaction %s send flow record %s", transactionID.c_str(), flow->getUUIDStr().c_str()); + logger_->log_debug("Site2Site transaction %s send flow record %s", transactionID.c_str(), flow->getUUIDStr().c_str()); if (resp == 0) { uint64_t endTime = getTimeMillis(); std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr(); @@ -218,7 +218,7 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptrlog_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes); + logger_->log_debug("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes); } catch (std::exception &exception) { if (transaction) deleteTransaction(transactionID); @@ -280,7 +280,7 @@ bool SiteToSiteClient::confirm(std::string transactionID) { // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. int64_t crcValue = transaction->getCRC(); std::string crc = std::to_string(crcValue); - logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), transactionID.c_str()); + logger_->log_debug("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), transactionID.c_str()); ret = writeResponse(transaction, CONFIRM_TRANSACTION, crc); if (ret <= 0) return false; @@ -291,18 +291,18 @@ bool SiteToSiteClient::confirm(std::string transactionID) { return false; if (code == CONFIRM_TRANSACTION) { - logger_->log_info("Site2Site transaction %s peer confirm transaction", transactionID.c_str()); + logger_->log_debug("Site2Site transaction %s peer confirm transaction", transactionID.c_str()); transaction->_state = TRANSACTION_CONFIRMED; return true; } else if (code == BAD_CHECKSUM) { - logger_->log_info("Site2Site transaction %s peer indicate bad checksum", transactionID.c_str()); + logger_->log_debug("Site2Site transaction %s peer indicate bad checksum", transactionID.c_str()); return false; } else { - logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code); + logger_->log_debug("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code); return false; } } else { - logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", transactionID.c_str()); + logger_->log_debug("Site2Site Send FINISH TRANSACTION for transaction %s", transactionID.c_str()); ret = writeResponse(transaction, FINISH_TRANSACTION, "FINISH_TRANSACTION"); if (ret <= 0) return false; @@ -312,19 +312,19 @@ bool SiteToSiteClient::confirm(std::string transactionID) { // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response if (code == CONFIRM_TRANSACTION) { - logger_->log_info("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str()); + logger_->log_debug("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str()); if (this->_currentVersion > 3) { int64_t crcValue = transaction->getCRC(); std::string crc = std::to_string(crcValue); if (message == crc) { - logger_->log_info("Site2Site transaction %s CRC matched", transactionID.c_str()); + logger_->log_debug("Site2Site transaction %s CRC matched", transactionID.c_str()); ret = writeResponse(transaction, CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); if (ret <= 0) return false; transaction->_state = TRANSACTION_CONFIRMED; return true; } else { - logger_->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str()); + logger_->log_debug("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str()); ret = writeResponse(transaction, BAD_CHECKSUM, "BAD_CHECKSUM"); return false; } @@ -335,7 +335,7 @@ bool SiteToSiteClient::confirm(std::string transactionID) { transaction->_state = TRANSACTION_CONFIRMED; return true; } else { - logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code); + logger_->log_debug("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code); return false; } return false; @@ -413,7 +413,7 @@ bool SiteToSiteClient::complete(std::string transactionID) { transaction->_state = TRANSACTION_COMPLETED; return true; } else { - logger_->log_info("Site2Site transaction %s send finished", transactionID.c_str()); + logger_->log_debug("Site2Site transaction %s send finished", transactionID.c_str()); ret = this->writeResponse(transaction, TRANSACTION_FINISHED, "Finished"); if (ret <= 0) { return false; @@ -433,11 +433,11 @@ bool SiteToSiteClient::complete(std::string transactionID) { return false; if (code == TRANSACTION_FINISHED) { - logger_->log_info("Site2Site transaction %s peer finished transaction", transactionID.c_str()); + logger_->log_debug("Site2Site transaction %s peer finished transaction", transactionID.c_str()); transaction->_state = TRANSACTION_COMPLETED; return true; } else { - logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code); + logger_->log_debug("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code); return false; } } @@ -448,7 +448,7 @@ int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, co std::shared_ptr transaction = NULL; if (flowFile && !flowFile->getResourceClaim()->exists()) { - logger_->log_info("Claim %s does not exist for FlowFile %s", flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr()); + logger_->log_debug("Claim %s does not exist for FlowFile %s", flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr()); return -2; } if (peer_state_ != READY) { @@ -467,12 +467,12 @@ int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, co } if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) { - logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); + logger_->log_debug("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); return -1; } if (transaction->getDirection() != SEND) { - logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); + logger_->log_debug("Site2Site transaction %s direction is wrong", transactionID.c_str()); return -1; } @@ -500,7 +500,7 @@ int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, co if (ret <= 0) { return -1; } - logger_->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), itAttribute->first.c_str(), itAttribute->second.c_str()); + logger_->log_debug("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), itAttribute->first.c_str(), itAttribute->second.c_str()); } uint64_t len = 0; @@ -508,14 +508,14 @@ int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, co len = flowFile->getSize(); ret = transaction->getStream().write(len); if (ret != 8) { - logger_->log_info("ret != 8"); + logger_->log_debug("ret != 8"); return -1; } if (flowFile->getSize() > 0) { sitetosite::ReadCallback callback(packet); session->read(flowFile, &callback); if (flowFile->getSize() != packet->_size) { - logger_->log_info("MisMatched sizes %d %d", flowFile->getSize(), packet->_size); + logger_->log_debug("MisMatched sizes %d %d", flowFile->getSize(), packet->_size); return -2; } } @@ -535,7 +535,7 @@ int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, co ret = transaction->getStream().writeData(reinterpret_cast(const_cast(packet->payload_.c_str())), len); if (ret != (int64_t)len) { - logger_->log_info("ret != len"); + logger_->log_debug("ret != len"); return -1; } packet->_size += len; @@ -545,7 +545,7 @@ int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, co transaction->total_transfers_++; transaction->_state = DATA_EXCHANGED; transaction->_bytes += len; - logger_->log_info("Site2Site transaction %s send flow record %d, total length %d, added %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes, len); + logger_->log_debug("Site2Site transaction %s send flow record %d, total length %d, added %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes, len); return 0; } @@ -571,12 +571,12 @@ bool SiteToSiteClient::receive(std::string transactionID, DataPacket *packet, bo } if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) { - logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); + logger_->log_debug("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); return false; } if (transaction->getDirection() != RECEIVE) { - logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); + logger_->log_debug("Site2Site transaction %s direction is wrong", transactionID.c_str()); return false; } @@ -596,21 +596,21 @@ bool SiteToSiteClient::receive(std::string transactionID, DataPacket *packet, bo return false; } if (code == CONTINUE_TRANSACTION) { - logger_->log_info("Site2Site transaction %s peer indicate continue transaction", transactionID.c_str()); + logger_->log_debug("Site2Site transaction %s peer indicate continue transaction", transactionID.c_str()); transaction->_dataAvailable = true; } else if (code == FINISH_TRANSACTION) { - logger_->log_info("Site2Site transaction %s peer indicate finish transaction", transactionID.c_str()); + logger_->log_debug("Site2Site transaction %s peer indicate finish transaction", transactionID.c_str()); transaction->_dataAvailable = false; eof = true; return true; } else { - logger_->log_info("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.c_str(), code); + logger_->log_debug("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.c_str(), code); return false; } } if (!transaction->isDataAvailable()) { - logger_->log_info("No data is available"); + logger_->log_debug("No data is available"); eof = true; return true; } @@ -618,13 +618,13 @@ bool SiteToSiteClient::receive(std::string transactionID, DataPacket *packet, bo // start to read the packet uint32_t numAttributes; ret = transaction->getStream().read(numAttributes); - logger_->log_info("returning true/false because ret is %d %d", ret, numAttributes); + logger_->log_debug("returning true/false because ret is %d %d", ret, numAttributes); if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) { return false; } // read the attributes - logger_->log_info("Site2Site transaction %s receives attribute key %llu", transactionID.c_str(), numAttributes); + logger_->log_debug("Site2Site transaction %s receives attribute key %llu", transactionID.c_str(), numAttributes); for (unsigned int i = 0; i < numAttributes; i++) { std::string key; std::string value; @@ -637,7 +637,7 @@ bool SiteToSiteClient::receive(std::string transactionID, DataPacket *packet, bo return false; } packet->_attributes[key] = value; - logger_->log_info("Site2Site transaction %s receives attribute key %s value %s", transactionID.c_str(), key.c_str(), value.c_str()); + logger_->log_debug("Site2Site transaction %s receives attribute key %s value %s", transactionID.c_str(), key.c_str(), value.c_str()); } uint64_t len; @@ -651,14 +651,14 @@ bool SiteToSiteClient::receive(std::string transactionID, DataPacket *packet, bo transaction->current_transfers_++; transaction->total_transfers_++; } else { - logger_->log_info("Site2Site transaction %s receives attribute ?", transactionID); + logger_->log_debug("Site2Site transaction %s receives attribute ?", transactionID); transaction->_dataAvailable = false; eof = true; return true; } transaction->_state = DATA_EXCHANGED; transaction->_bytes += len; - logger_->log_info("Site2Site transaction %s receives flow record %d, total length %d, added %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes, len); + logger_->log_debug("Site2Site transaction %s receives flow record %d, total length %d, added %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes, len); return true; } @@ -726,7 +726,7 @@ bool SiteToSiteClient::receiveFlowFiles(const std::shared_ptrgetSize() << " bytes, but actually sent " << packet._size; throw Exception(SITE2SITE_EXCEPTION, message.str().c_str()); } else { - logger_->log_info("received %d with expected %d", flowFile->getSize(), packet._size); + logger_->log_debug("received %d with expected %d", flowFile->getSize(), packet._size); } } core::Relationship relation; // undefined relationship diff --git a/libminifi/test/curl-tests/CMakeLists.txt b/libminifi/test/curl-tests/CMakeLists.txt index ff68b8e85b..b645da1a9e 100644 --- a/libminifi/test/curl-tests/CMakeLists.txt +++ b/libminifi/test/curl-tests/CMakeLists.txt @@ -73,4 +73,4 @@ add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCE add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" "${TEST_RESOURCES}/") add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/site-to-site") add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/") -add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/") +add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/ThreadPoolAdjust.yml" "${TEST_RESOURCES}/") diff --git a/libminifi/test/resources/ThreadPoolAdjust.yml b/libminifi/test/resources/ThreadPoolAdjust.yml index 8b3c989776..602f26f7d3 100644 --- a/libminifi/test/resources/ThreadPoolAdjust.yml +++ b/libminifi/test/resources/ThreadPoolAdjust.yml @@ -43,7 +43,7 @@ Processors: auto-terminated relationships list: Properties: Base Path: urlofchampions - Listening Port: 10016 + Listening Port: 10099 - name: Invoke id: 2438e3c8-015a-1000-79ca-83af40ec1992 class: org.apache.nifi.processors.standard.InvokeHTTP @@ -59,7 +59,7 @@ Processors: HTTP Method: POST Use Chunked Encoding: true Content-type: text/html - Remote URL: http://localhost:10016/urlofchampions + Remote URL: http://localhost:10099/urlofchampions - name: Loggit id: 2438e3c8-015a-1000-79ca-83af40ec1993 class: org.apache.nifi.processors.standard.LogAttribute From b1112b454e7535d7791bf0c8aafa4a969fb208a1 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Fri, 22 Dec 2017 10:00:06 -0500 Subject: [PATCH 2/2] MINIFICPP-355: Avoid mismanagement of integers in log statements --- extensions/http-curl/protocols/RESTReceiver.h | 7 +- libminifi/include/FlowControlProtocol.h | 4 +- libminifi/include/capi/Plan.h | 2 +- libminifi/include/core/ClassLoader.h | 4 - libminifi/include/core/Property.h | 9 ++ .../StandardControllerServiceProvider.h | 2 +- libminifi/include/core/logging/Logger.h | 138 +++++++++++++++++- .../core/repository/VolatileRepository.h | 10 +- libminifi/include/io/AtomicEntryStream.h | 2 - libminifi/include/provenance/Provenance.h | 1 - .../include/sitetosite/RawSocketProtocol.h | 11 +- libminifi/include/sitetosite/SiteToSite.h | 63 ++------ .../include/sitetosite/SiteToSiteClient.h | 12 +- libminifi/include/utils/Id.h | 2 +- libminifi/include/utils/StringUtils.h | 20 ++- libminifi/src/Connection.cpp | 2 +- libminifi/src/FlowControlProtocol.cpp | 14 +- libminifi/src/FlowController.cpp | 1 - libminifi/src/FlowFileRecord.cpp | 12 +- libminifi/src/RemoteProcessorGroupPort.cpp | 7 +- libminifi/src/ThreadedSchedulingAgent.cpp | 4 +- libminifi/src/capi/Plan.cpp | 2 - libminifi/src/capi/api.cpp | 2 +- libminifi/src/core/ClassLoader.cpp | 6 +- libminifi/src/core/ProcessSession.cpp | 24 ++- .../src/core/logging/LoggerConfiguration.cpp | 2 +- .../SiteToSiteProvenanceReportingTask.cpp | 8 +- .../repository/VolatileContentRepository.cpp | 4 +- libminifi/src/core/yaml/YamlConfiguration.cpp | 29 ++-- libminifi/src/io/ClientSocket.cpp | 2 +- libminifi/src/io/FileStream.cpp | 4 +- libminifi/src/processors/ExecuteProcess.cpp | 16 ++ libminifi/src/processors/GetTCP.cpp | 4 +- libminifi/src/processors/ListenHTTP.cpp | 2 +- libminifi/src/processors/TailFile.cpp | 20 ++- libminifi/src/provenance/Provenance.cpp | 8 +- .../src/sitetosite/RawSocketProtocol.cpp | 55 ++++++- libminifi/src/sitetosite/SiteToSite.cpp | 57 ++++++++ libminifi/src/sitetosite/SiteToSiteClient.cpp | 19 ++- libminifi/src/utils/Id.cpp | 25 ++-- libminifi/test/unit/IdTests.cpp | 2 +- 41 files changed, 420 insertions(+), 198 deletions(-) create mode 100644 libminifi/src/sitetosite/SiteToSite.cpp diff --git a/extensions/http-curl/protocols/RESTReceiver.h b/extensions/http-curl/protocols/RESTReceiver.h index 77cfc902a9..b0de62ab9b 100644 --- a/extensions/http-curl/protocols/RESTReceiver.h +++ b/extensions/http-curl/protocols/RESTReceiver.h @@ -70,9 +70,10 @@ class RESTReceiver : public RESTProtocol, public HeartBeatReporter { currentvalue = resp_; } - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - currentvalue.length()); + std::stringstream output; + output << "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: " << currentvalue.length() << "\r\nConnection: close\r\n\r\n"; + + mg_printf(conn, "%s", output.str().c_str()); mg_printf(conn, "%s", currentvalue.c_str()); return true; } diff --git a/libminifi/include/FlowControlProtocol.h b/libminifi/include/FlowControlProtocol.h index c0781b87b5..56b1f62e66 100644 --- a/libminifi/include/FlowControlProtocol.h +++ b/libminifi/include/FlowControlProtocol.h @@ -172,12 +172,12 @@ class FlowControlProtocol { logger_->log_info("NiFi Server Name %s", _serverName.c_str()); } if (configure->get(Configure::nifi_server_port, value) && core::Property::StringToInt(value, _serverPort)) { - logger_->log_info("NiFi Server Port: [%d]", _serverPort); + logger_->log_info("NiFi Server Port: [%ll]", _serverPort); } if (configure->get(Configure::nifi_server_report_interval, value)) { core::TimeUnit unit; if (core::Property::StringToTime(value, _reportInterval, unit) && core::Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval)) { - logger_->log_info("NiFi server report interval: [%d] ms", _reportInterval); + logger_->log_info("NiFi server report interval: [%ll] ms", _reportInterval); } } else _reportInterval = 0; diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h index cd9d7568c5..8687168bf0 100644 --- a/libminifi/include/capi/Plan.h +++ b/libminifi/include/capi/Plan.h @@ -94,7 +94,7 @@ class ExecutionPlan { std::atomic finalized; - int location; + uint32_t location; std::shared_ptr current_session_; std::shared_ptr current_flowfile_; diff --git a/libminifi/include/core/ClassLoader.h b/libminifi/include/core/ClassLoader.h index d16a39b639..9297eef140 100644 --- a/libminifi/include/core/ClassLoader.h +++ b/libminifi/include/core/ClassLoader.h @@ -24,7 +24,6 @@ #include "utils/StringUtils.h" #include #include "core/Core.h" -#include "core/logging/Logger.h" #include "io/DataStream.h" namespace org { @@ -270,9 +269,6 @@ class ClassLoader { std::mutex internal_mutex_; std::vector dl_handles_; - - private: - std::shared_ptr logger_; }; template diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h index a8b5c8650d..788e452d49 100644 --- a/libminifi/include/core/Property.h +++ b/libminifi/include/core/Property.h @@ -349,6 +349,15 @@ class Property { return StringToInt(input, output); } + static bool StringToInt(std::string input, int32_t &output) { + return StringToInt(input, output); + } + + // Convert String to Integer + static bool StringToInt(std::string input, uint32_t &output) { + return StringToInt(input, output); + } + protected: // Name std::string name_; diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h index 506395416b..ff754888fb 100644 --- a/libminifi/include/core/controller/StandardControllerServiceProvider.h +++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h @@ -107,7 +107,7 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ } virtual void enableAllControllerServices() { - logger_->log_info("Enabling %d controller services", controller_map_->getAllControllerServices().size()); + logger_->log_info("Enabling %ll controller services", controller_map_->getAllControllerServices().size()); for (auto service : controller_map_->getAllControllerServices()) { if (service->canEnable()) { diff --git a/libminifi/include/core/logging/Logger.h b/libminifi/include/core/logging/Logger.h index d7d52b6a2a..a81e604055 100644 --- a/libminifi/include/core/logging/Logger.h +++ b/libminifi/include/core/logging/Logger.h @@ -1,7 +1,4 @@ /** - * @file Logger.h - * Logger class declaration - * This is a C++ wrapper for spdlog, a lightweight C++ logging library * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -23,6 +20,7 @@ #include #include +#include #include "spdlog/spdlog.h" @@ -55,7 +53,72 @@ inline T conditional_conversion(T const& t) { return t; } -class Logger { +typedef enum { + trace = 0, + debug = 1, + info = 2, + warn = 3, + err = 4, + critical = 5, + off = 6 +} LOG_LEVEL; + +class BaseLogger { + public: + + virtual ~BaseLogger() { + + } + virtual void log_string(LOG_LEVEL level, std::string str) = 0; + + virtual bool should_log(const LOG_LEVEL &level) { + return true; + } + +}; + +/** + * LogBuilder is a class to facilitate using the LOG macros below and an associated put-to operator. + * + */ +class LogBuilder { + public: + LogBuilder(BaseLogger *l, LOG_LEVEL level) + : ignore(false), + ptr(l), + level(level) { + if (!l->should_log(level)) { + setIgnore(); + } + } + + ~LogBuilder() { + if (!ignore) + log_string(level); + } + + void setIgnore() { + ignore = true; + } + + void log_string(LOG_LEVEL level) { + ptr->log_string(level, str.str()); + } + + template + LogBuilder &operator<<(const T &o) { + if (!ignore) + str << o; + return *this; + } + + bool ignore; + BaseLogger *ptr; + std::stringstream str; + LOG_LEVEL level; +}; + +class Logger : public BaseLogger { public: /** * @brief Log error message @@ -107,7 +170,64 @@ class Logger { log(spdlog::level::trace, format, args...); } + bool should_log(const LOG_LEVEL &level) { + spdlog::level::level_enum logger_level = spdlog::level::level_enum::info; + switch (level) { + case critical: + logger_level = spdlog::level::level_enum::critical; + break; + case err: + logger_level = spdlog::level::level_enum::err; + break; + case info: + break; + case debug: + logger_level = spdlog::level::level_enum::debug; + break; + case off: + logger_level = spdlog::level::level_enum::off; + break; + case trace: + logger_level = spdlog::level::level_enum::trace; + break; + case warn: + logger_level = spdlog::level::level_enum::warn; + break; + } + + std::lock_guard lock(mutex_); + if (!delegate_->should_log(logger_level)) { + return false; + } + return true; + } + protected: + + virtual void log_string(LOG_LEVEL level, std::string str) { + switch (level) { + case critical: + log_warn(str.c_str()); + break; + case err: + log_error(str.c_str()); + break; + case info: + log_info(str.c_str()); + break; + case debug: + log_debug(str.c_str()); + break; + case trace: + log_trace(str.c_str()); + break; + case warn: + log_warn(str.c_str()); + break; + case off: + break; + } + } Logger(std::shared_ptr delegate) : delegate_(delegate) { } @@ -129,6 +249,16 @@ class Logger { Logger& operator=(Logger const&); }; +#define LOG_DEBUG(x) LogBuilder(x.get(),logging::LOG_LEVEL::debug) + +#define LOG_INFO(x) LogBuilder(x.get(),logging::LOG_LEVEL::info) + +#define LOG_TRACE(x) LogBuilder(x.get(),logging::LOG_LEVEL::trace) + +#define LOG_ERROR(x) LogBuilder(x.get(),logging::LOG_LEVEL::err) + +#define LOG_WARN(x) LogBuilder(x.get(),logging::LOG_LEVEL::warn) + } /* namespace logging */ } /* namespace core */ } /* namespace minifi */ diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h index 01bf165256..33fcf831e9 100644 --- a/libminifi/include/core/repository/VolatileRepository.h +++ b/libminifi/include/core/repository/VolatileRepository.h @@ -219,8 +219,8 @@ bool VolatileRepository::initialize(const std::shared_ptr &configu } } - logger_->log_info("Resizing value_vector_ for %s count is %d", getName(), max_count_); - logger_->log_info("Using a maximum size for %s of %u", getName(), max_size_); + logging::LOG_INFO(logger_) << "Resizing value_vector_ for " << getName() << " count is " << max_count_; + logging::LOG_INFO(logger_) << "Using a maximum size for " << getName() << " of " << max_size_; value_vector_.reserve(max_count_); for (uint32_t i = 0; i < max_count_; i++) { value_vector_.emplace_back(new AtomicEntry(¤t_size_, &max_size_)); @@ -254,7 +254,7 @@ bool VolatileRepository::Put(T key, const uint8_t *buf, size_t bufLen) { } updated = value_vector_.at(private_index)->setRepoValue(new_value, old_value, reclaimed_size); - logger_->log_debug("Set repo value at %d out of %d updated %d current_size %d, adding %d to %d", private_index, max_count_, updated == true, reclaimed_size, size, current_size_.load()); + logger_->log_debug("Set repo value at %ll out of %ll updated %ll current_size %ll, adding %ll to %ll", private_index, max_count_, updated == true, reclaimed_size, size, current_size_.load()); if (updated && reclaimed_size > 0) { std::lock_guard lock(mutex_); emplace(old_value); @@ -273,7 +273,7 @@ bool VolatileRepository::Put(T key, const uint8_t *buf, size_t bufLen) { } while (!updated); current_size_ += size; - logger_->log_debug("VolatileRepository -- put %d %d", current_size_.load(), current_index_.load()); + logger_->log_debug("VolatileRepository -- put %ll %ll", current_size_.load(), current_index_.load()); return true; } /** @@ -344,7 +344,7 @@ bool VolatileRepository::DeSerialize(std::vector bool VolatileRepository::DeSerialize(std::vector> &store, size_t &max_size) { - logger_->log_debug("VolatileRepository -- DeSerialize %d", current_size_.load()); + logger_->log_debug("VolatileRepository -- DeSerialize %ll", current_size_.load()); max_size = 0; for (auto ent : value_vector_) { // let the destructor do the cleanup diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h index 7ddf9dfdfa..d383125810 100644 --- a/libminifi/include/io/AtomicEntryStream.h +++ b/libminifi/include/io/AtomicEntryStream.h @@ -145,8 +145,6 @@ int AtomicEntryStream::writeData(uint8_t *value, int size) { length_ = offset_; } return size; - } else { - logger_->log_debug("Cannot insert %d bytes due to insufficient space in atomic entry", size); } } diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h index 6d9895a147..72fd379032 100644 --- a/libminifi/include/provenance/Provenance.h +++ b/libminifi/include/provenance/Provenance.h @@ -467,7 +467,6 @@ class ProvenanceReporter { // Add event void add(ProvenanceEventRecord *event) { _events.insert(event); - logger_->log_debug("Prove reporter now %d", _events.size()); } // Remove event void remove(ProvenanceEventRecord *event) { diff --git a/libminifi/include/sitetosite/RawSocketProtocol.h b/libminifi/include/sitetosite/RawSocketProtocol.h index b54b3801c3..7a075bf658 100644 --- a/libminifi/include/sitetosite/RawSocketProtocol.h +++ b/libminifi/include/sitetosite/RawSocketProtocol.h @@ -53,6 +53,7 @@ namespace nifi { namespace minifi { namespace sitetosite { + /** * Site2Site Peer */ @@ -64,6 +65,10 @@ typedef struct Site2SitePeerStatus { // RawSiteToSiteClient Class class RawSiteToSiteClient : public sitetosite::SiteToSiteClient { public: + + // HandShakeProperty Str + static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY]; + // Constructor /*! * Create a new control protocol @@ -153,9 +158,9 @@ class RawSiteToSiteClient : public sitetosite::SiteToSiteClient { virtual int writeRespond(const std::shared_ptr &transaction, RespondCode code, std::string message); // getRespondCodeContext virtual RespondCodeContext *getRespondCodeContext(RespondCode code) { - for (unsigned int i = 0; i < sizeof(respondCodeContext) / sizeof(RespondCodeContext); i++) { - if (respondCodeContext[i].code == code) { - return &respondCodeContext[i]; + for (unsigned int i = 0; i < sizeof(SiteToSiteRequest::respondCodeContext) / sizeof(RespondCodeContext); i++) { + if (SiteToSiteRequest::respondCodeContext[i].code == code) { + return &SiteToSiteRequest::respondCodeContext[i]; } } return NULL; diff --git a/libminifi/include/sitetosite/SiteToSite.h b/libminifi/include/sitetosite/SiteToSite.h index 8219f58c74..b7e5ce0853 100644 --- a/libminifi/include/sitetosite/SiteToSite.h +++ b/libminifi/include/sitetosite/SiteToSite.h @@ -89,43 +89,6 @@ typedef enum { MAX_HANDSHAKE_PROPERTY } HandshakeProperty; -// HandShakeProperty Str -static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = { -/** - * Boolean value indicating whether or not the contents of a FlowFile should - * be GZipped when transferred. - */ -"GZIP", -/** - * The unique identifier of the port to communicate with - */ -"PORT_IDENTIFIER", -/** - * Indicates the number of milliseconds after the request was made that the - * client will wait for a response. If no response has been received by the - * time this value expires, the server can move on without attempting to - * service the request because the client will have already disconnected. - */ -"REQUEST_EXPIRATION_MILLIS", -/** - * The preferred number of FlowFiles that the server should send to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. - */ -"BATCH_COUNT", -/** - * The preferred number of bytes that the server should send to the client - * when pulling data. This property was introduced in version 5 of the - * protocol. - */ -"BATCH_SIZE", -/** - * The preferred amount of time that the server should send data to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. Value is in milliseconds. - */ -"BATCH_DURATION" }; - typedef enum { RAW, HTTP @@ -210,8 +173,6 @@ typedef enum { MAX_REQUEST_TYPE } RequestType; -// Request Type Str -static const char *RequestTypeStr[MAX_REQUEST_TYPE] = { "NEGOTIATE_FLOWFILE_CODEC", "REQUEST_PEER_LIST", "SEND_FLOWFILES", "RECEIVE_FLOWFILES", "SHUTDOWN" }; // Respond Code typedef enum { @@ -226,7 +187,7 @@ typedef enum { // transaction indicators CONTINUE_TRANSACTION = 10, FINISH_TRANSACTION = 11, - CONFIRM_TRANSACTION = 12, // "Explanation" of this code is the checksum + CONFIRM_TRANSACTION = 12,// "Explanation" of this code is the checksum TRANSACTION_FINISHED = 13, TRANSACTION_FINISHED_BUT_DESTINATION_FULL = 14, CANCEL_TRANSACTION = 15, @@ -244,23 +205,23 @@ typedef enum { ABORT = 250, UNRECOGNIZED_RESPONSE_CODE = 254, END_OF_STREAM = 255 -} RespondCode; +}RespondCode; // Respond Code Class typedef struct { RespondCode code; - const char *description; - bool hasDescription; + const char *description;bool hasDescription; } RespondCodeContext; -// Respond Code Context -static RespondCodeContext respondCodeContext[] = { { RESERVED, "Reserved for Future Use", false }, { PROPERTIES_OK, "Properties OK", false }, { UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true }, - { ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true }, { MISSING_PROPERTY, "Missing Property", true }, { CONTINUE_TRANSACTION, "Continue Transaction", false }, { FINISH_TRANSACTION, - "Finish Transaction", false }, { CONFIRM_TRANSACTION, "Confirm Transaction", true }, { TRANSACTION_FINISHED, "Transaction Finished", false }, { TRANSACTION_FINISHED_BUT_DESTINATION_FULL, - "Transaction Finished But Destination is Full", false }, { CANCEL_TRANSACTION, "Cancel Transaction", true }, { BAD_CHECKSUM, "Bad Checksum", false }, { MORE_DATA, "More Data Exists", false }, - { NO_MORE_DATA, "No More Data Exists", false }, { UNKNOWN_PORT, "Unknown Port", false }, { PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true }, { PORTS_DESTINATION_FULL, - "Port's Destination is Full", false }, { UNAUTHORIZED, "User Not Authorized", true }, { ABORT, "Abort", true }, { UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false }, { - END_OF_STREAM, "End of Stream", false } }; + + +// Request Type Str +class SiteToSiteRequest { +public: + static const char *RequestTypeStr[MAX_REQUEST_TYPE]; + static RespondCodeContext respondCodeContext[21]; +}; + // Transaction Class class Transaction { diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h index 59539a1dc1..3200bedb45 100644 --- a/libminifi/include/sitetosite/SiteToSiteClient.h +++ b/libminifi/include/sitetosite/SiteToSiteClient.h @@ -221,9 +221,9 @@ class SiteToSiteClient : public core::Connectable { virtual int writeResponse(const std::shared_ptr &transaction, RespondCode code, std::string message); // getRespondCodeContext virtual RespondCodeContext *getRespondCodeContext(RespondCode code) { - for (unsigned int i = 0; i < sizeof(respondCodeContext) / sizeof(RespondCodeContext); i++) { - if (respondCodeContext[i].code == code) { - return &respondCodeContext[i]; + for (unsigned int i = 0; i < sizeof(SiteToSiteRequest::respondCodeContext) / sizeof(RespondCodeContext); i++) { + if (SiteToSiteRequest::respondCodeContext[i].code == code) { + return &SiteToSiteRequest::respondCodeContext[i]; } } return NULL; @@ -281,14 +281,14 @@ class WriteCallback : public OutputStreamCallback { int size = std::min(len, 16384); int ret = _packet->transaction_->getStream().readData(buffer, size); if (ret != size) { - _packet->logger_reference_->log_error("Site2Site Receive Flow Size %d Failed %d, should have received %d", size, ret, len); + logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len; return -1; } stream->write(buffer, size); len -= size; total += size; } - _packet->logger_reference_->log_info("Received %d from stream",len); + logging::LOG_INFO(_packet->logger_reference_) << "Received " << len << " from stream"; return len; } }; @@ -315,7 +315,7 @@ class ReadCallback : public InputStreamCallback { } int ret = _packet->transaction_->getStream().writeData(buffer, readSize); if (ret != readSize) { - _packet->logger_reference_->log_error("Site2Site Send Flow Size %d Failed %d", readSize, ret); + logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow Size " << readSize << " Failed " << ret; return -1; } size += readSize; diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h index 8431548802..d9f0811d30 100644 --- a/libminifi/include/utils/Id.h +++ b/libminifi/include/utils/Id.h @@ -51,8 +51,8 @@ class IdGenerator { uint64_t getRandomDeviceSegment(int numBits); private: IdGenerator(); - std::shared_ptr logger_; int implementation_; + std::shared_ptr logger_; unsigned char deterministic_prefix_[8]; std::atomic incrementor_; }; diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h index 5dce17b909..ddd8307514 100644 --- a/libminifi/include/utils/StringUtils.h +++ b/libminifi/include/utils/StringUtils.h @@ -95,13 +95,21 @@ class StringUtils { static std::vector split(const std::string &str, const std::string &delimiter) { std::vector result; - size_t last = 0; - size_t next = 0; - while ((next = str.find(delimiter, last)) != std::string::npos) { - result.push_back(str.substr(last, next - last)); - last = next + delimiter.length(); + auto curr = str.begin(); + auto end = str.end(); + auto is_func = [delimiter](int s) { + return delimiter.at(0) == s; + }; + while (curr != end) { + curr = std::find_if_not(curr, end, is_func); + if (curr == end) { + break; + } + auto next = std::find_if(curr, end, is_func); + result.push_back(std::string(curr, next)); + curr = next; } - result.push_back(str.substr(last, next - last)); + return result; } diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp index 082063d487..5b9187f363 100644 --- a/libminifi/src/Connection.cpp +++ b/libminifi/src/Connection.cpp @@ -91,7 +91,7 @@ void Connection::put(std::shared_ptr flow) { queued_data_size_ += flow->getSize(); - logger_->log_debug("Enqueue flow file UUID %s to connection %s %d", flow->getUUIDStr(), name_, queue_.size()); + logger_->log_debug("Enqueue flow file UUID %s to connection %s", flow->getUUIDStr(), name_); } if (!flow->isStored()) { diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp index 74a1573263..b5fc9282fc 100644 --- a/libminifi/src/FlowControlProtocol.cpp +++ b/libminifi/src/FlowControlProtocol.cpp @@ -102,12 +102,12 @@ int FlowControlProtocol::connectServer(const char *host, uint16_t port) { status = connect(sock, (struct sockaddr *) &sa, socklen); if (status < 0) { - logger_->log_error("socket connect failed to %s %d", host, port); + logger_->log_error("socket connect failed to %s %ll", host, port); close(sock); return 0; } - logger_->log_info("Flow Control Protocol socket %d connect to server %s port %d success", sock, host, port); + logger_->log_info("Flow Control Protocol socket %ll connect to server %s port %ll success", sock, host, port); return sock; } @@ -292,9 +292,9 @@ int FlowControlProtocol::sendRegisterReq() { return -1; } logger_->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); - logger_->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); + logger_->log_info("Flow Control Protocol receive Seq Num %ll", hdr.seqNumber); logger_->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); - logger_->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen); + logger_->log_info("Flow Control Protocol receive Payload len %ll", hdr.payloadLen); if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) { this->_registered = true; @@ -317,7 +317,7 @@ int FlowControlProtocol::sendRegisterReq() { // Fixed 4 bytes uint32_t reportInterval; payloadPtr = this->decode(payloadPtr, reportInterval); - logger_->log_info("Flow Control Protocol receive report interval %d ms", reportInterval); + logger_->log_info("Flow Control Protocol receive report interval %ll ms", reportInterval); this->_reportInterval = reportInterval; } else { break; @@ -386,9 +386,9 @@ int FlowControlProtocol::sendReportReq() { return -1; } logger_->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); - logger_->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); + logger_->log_info("Flow Control Protocol receive Seq Num %ll", hdr.seqNumber); logger_->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); - logger_->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen); + logger_->log_info("Flow Control Protocol receive Payload len %ll", hdr.payloadLen); if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) { this->_seqNumber++; diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 95b9c522ca..7e067275fb 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -310,7 +310,6 @@ void FlowController::loadFlowRepo() { if (this->root_ != nullptr) { this->root_->getConnections(connectionMap); } - logger_->log_debug("Number of connections from connectionMap %d", connectionMap.size()); flow_file_repo_->setConnectionMap(connectionMap); flow_file_repo_->loadComponent(content_repo_); } else { diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index ec5b6e8d44..38164852c4 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -119,7 +119,7 @@ void FlowFileRecord::releaseClaim(std::shared_ptr claim) { // Decrease the flow file record owned count for the resource claim claim_->decreaseFlowFileRecordOwnedCount(); std::string value; - logger_->log_debug("Delete Resource Claim %s, %s, attempt %d", getUUIDStr(), claim_->getContentFullPath().c_str(), claim_->getFlowFileRecordOwnedCount()); + logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim_->getContentFullPath().c_str(), claim_->getFlowFileRecordOwnedCount()); if (claim_->getFlowFileRecordOwnedCount() <= 0) { // we cannot rely on the stored variable here since we aren't guaranteed atomicity if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) { @@ -186,17 +186,15 @@ bool FlowFileRecord::DeSerialize(std::string key) { if (!ret) { logger_->log_error("NiFi FlowFile Store event %s can not found", key.c_str()); return false; - } else { - logger_->log_debug("NiFi FlowFile Read event %s length %d", key.c_str(), value.length()); } io::DataStream stream((const uint8_t*) value.data(), value.length()); ret = DeSerialize(stream); if (ret) { - logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %s success", uuidStr_.c_str(), stream.getSize(), uuid_connection_.c_str()); + logger_->log_debug("NiFi FlowFile retrieve uuid %s size %llu connection %s success", uuidStr_.c_str(), stream.getSize(), uuid_connection_.c_str()); } else { - logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %d fail", uuidStr_.c_str(), stream.getSize(), uuid_connection_.c_str()); + logger_->log_debug("NiFi FlowFile retrieve uuid %s size %llu connection %s fail", uuidStr_.c_str(), stream.getSize(), uuid_connection_.c_str()); } return ret; @@ -265,10 +263,10 @@ bool FlowFileRecord::Serialize() { } if (flow_repository_->Put(uuidStr_, const_cast(outStream.getBuffer()), outStream.getSize())) { - logger_->log_debug("NiFi FlowFile Store event %s size %d success", uuidStr_.c_str(), outStream.getSize()); + logger_->log_debug("NiFi FlowFile Store event %s size %llu success", uuidStr_.c_str(), outStream.getSize()); return true; } else { - logger_->log_error("NiFi FlowFile Store event %s size %d fail", uuidStr_.c_str(), outStream.getSize()); + logger_->log_error("NiFi FlowFile Store event %s size %llu fail", uuidStr_.c_str(), outStream.getSize()); return false; } diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 1084efbdba..caddc32f5c 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -71,7 +71,7 @@ std::unique_ptr RemoteProcessorGroupPort::getNextP nextProtocol = sitetosite::createClient(config); } else if (peer_index_ >= 0) { std::lock_guard lock(peer_mutex_); - logger_->log_info("Creating client from peer %d", peer_index_.load()); + logger_->log_info("Creating client from peer %ll", peer_index_.load()); sitetosite::SiteToSiteClientConfiguration config(stream_factory_, peers_[this->peer_index_].getPeer(), client_type_); peer_index_++; @@ -283,7 +283,7 @@ void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { logger_->log_info("process group remote site2site port %d, is secure %d", site2site_port_, site2site_secure_); } } else { - logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code %d from %s", client->getResponseCode(), fullUrl); + logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSite2SiteInfo: received HTTP code %ll from %s", client->getResponseCode(), fullUrl); } } else { logger_->log_error("ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed \n"); @@ -303,7 +303,8 @@ void RemoteProcessorGroupPort::refreshPeerList() { protocol->getPeerList(peers_); - logger_->log_info("Have %d peers", peers_.size()); + logging::LOG_INFO(logger_) << "Have " << peers_.size() << " peers"; + if (peers_.size() > 0) peer_index_ = 0; } diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index d74a74a734..8864ba085a 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -45,7 +45,7 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr processo if (configure_->get(Configure::nifi_administrative_yield_duration, yieldValue)) { core::TimeUnit unit; if (core::Property::StringToTime(yieldValue, admin_yield_duration_, unit) && core::Property::ConvertTimeUnitToMS(admin_yield_duration_, unit, admin_yield_duration_)) { - logger_->log_debug("nifi_administrative_yield_duration: [%d] ms", admin_yield_duration_); + logger_->log_debug("nifi_administrative_yield_duration: [%ll] ms", admin_yield_duration_); } } @@ -53,7 +53,7 @@ void ThreadedSchedulingAgent::schedule(std::shared_ptr processo if (configure_->get(Configure::nifi_bored_yield_duration, yieldValue)) { core::TimeUnit unit; if (core::Property::StringToTime(yieldValue, bored_yield_duration_, unit) && core::Property::ConvertTimeUnitToMS(bored_yield_duration_, unit, bored_yield_duration_)) { - logger_->log_debug("nifi_bored_yield_duration: [%d] ms", bored_yield_duration_); + logger_->log_debug("nifi_bored_yield_duration: [%ll] ms", bored_yield_duration_); } } diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp index f038e3dc29..53133d1673 100644 --- a/libminifi/src/capi/Plan.cpp +++ b/libminifi/src/capi/Plan.cpp @@ -62,7 +62,6 @@ bool linkToPrevious) { std::stringstream connection_name; connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr(); - logger_->log_info("Creating %s connection for proc %d", connection_name.str(), processor_queue_.size() + 1); std::shared_ptr connection = std::make_shared(flow_repo_, content_repo_, connection_name.str()); connection->setRelationship(relationship); @@ -146,7 +145,6 @@ bool ExecutionPlan::runNextProcessor(std::functionlog_info("Running next processor %d, processor_queue_.size %d, processor_contexts_.size %d", location, processor_queue_.size(), processor_contexts_.size()); location++; std::shared_ptr processor = processor_queue_.at(location); diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp index e46fcad71d..0fffcb8668 100644 --- a/libminifi/src/capi/api.cpp +++ b/libminifi/src/capi/api.cpp @@ -240,7 +240,7 @@ flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) { size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) { auto execution_plan = static_cast(flow->plan); - int i = 0; + size_t i = 0; for (; i < size; i++) { execution_plan->reset(); while (execution_plan->runNextProcessor()) { diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp index ae39eae436..5c2fdf8b90 100644 --- a/libminifi/src/core/ClassLoader.cpp +++ b/libminifi/src/core/ClassLoader.cpp @@ -20,7 +20,6 @@ #include #include "core/ClassLoader.h" -#include "core/logging/LoggerConfiguration.h" namespace org { namespace apache { @@ -28,8 +27,7 @@ namespace nifi { namespace minifi { namespace core { -ClassLoader::ClassLoader() - : logger_(logging::LoggerFactory::getLogger()) { +ClassLoader::ClassLoader() { } ClassLoader &ClassLoader::getDefaultClassLoader() { @@ -47,7 +45,6 @@ uint16_t ClassLoader::registerResource(const std::string &resource, const std::s resource_ptr = dlopen(resource.c_str(), RTLD_NOW | RTLD_GLOBAL); } if (!resource_ptr) { - logger_->log_error("Cannot load library: %s", dlerror()); return RESOURCE_FAILURE; } else { std::lock_guard lock(internal_mutex_); @@ -61,7 +58,6 @@ uint16_t ClassLoader::registerResource(const std::string &resource, const std::s createFactory* create_factory_func = reinterpret_cast(dlsym(resource_ptr, resourceFunction.c_str())); const char* dlsym_error = dlerror(); if (dlsym_error) { - logger_->log_error("Cannot load library: %s", dlsym_error); return RESOURCE_FAILURE; } diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 7f31647fa8..5bcaddaad7 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -139,9 +139,9 @@ std::shared_ptr ProcessSession::clone(const std::shared_ptr record = this->create(parent); if (record) { if (parent->getResourceClaim()) { - if ((uint64_t)(offset + size) > parent->getSize()) { + if ((uint64_t) (offset + size) > parent->getSize()) { // Set offset and size - logger_->log_error("clone offset %d and size %d exceed parent size %d", offset, size, parent->getSize()); + logger_->log_error("clone offset %ll and size %ll exceed parent size %llu", offset, size, parent->getSize()); // Remove the Add FlowFile for the session std::map >::iterator it = this->_addedFlowFiles.find(record->getUUIDStr()); if (it != this->_addedFlowFiles.end()) @@ -165,7 +165,7 @@ std::shared_ptr ProcessSession::clone(const std::shared_ptr &flow) { flow->setDeleted(true); flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); - logger_->log_debug("Auto terminated %s %d %s", flow->getResourceClaim()->getContentFullPath(), flow->getResourceClaim()->getFlowFileRecordOwnedCount(), flow->getUUIDStr()); + logger_->log_debug("Auto terminated %s %llu %s", flow->getResourceClaim()->getContentFullPath(), flow->getResourceClaim()->getFlowFileRecordOwnedCount(), flow->getUUIDStr()); process_context_->getFlowFileRepository()->Delete(flow->getUUIDStr()); _deletedFlowFiles[flow->getUUIDStr()] = flow; std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr(); @@ -331,7 +331,7 @@ void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptrincreaseFlowFileRecordOwnedCount(); std::shared_ptr content_stream = process_context_->getContentRepository()->write(claim); @@ -367,13 +367,13 @@ void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptrsetResourceClaim(claim); - logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(), + logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); content_stream->closeStream(); std::stringstream details; details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); - uint64_t endTime = getTimeMillis(); + auto endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } catch (std::exception &exception) { if (flow && flow->getResourceClaim() == claim) { @@ -400,7 +400,7 @@ void ProcessSession::import(std::string source, const std::shared_ptrincreaseFlowFileRecordOwnedCount(); @@ -439,7 +439,7 @@ void ProcessSession::import(std::string source, const std::shared_ptrsetResourceClaim(claim); - logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(), + logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); stream->closeStream(); @@ -448,7 +448,7 @@ void ProcessSession::import(std::string source, const std::shared_ptrgetProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); - uint64_t endTime = getTimeMillis(); + auto endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } else { stream->closeStream(); @@ -526,10 +526,8 @@ void ProcessSession::import(std::string source, std::vectorsetResourceClaim(claim); claim->increaseFlowFileRecordOwnedCount(); - - logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath().c_str(), - flowFile->getUUIDStr().c_str()); - + logger_->log_debug("Import offset %llu length %llu into content %s for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(), + flowFile->getResourceClaim()->getContentFullPath().c_str(), flowFile->getUUIDStr().c_str()); stream->closeStream(); std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr(); uint64_t endTime = getTimeMillis(); diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp index c06239be7e..b0868e451a 100644 --- a/libminifi/src/core/logging/LoggerConfiguration.cpp +++ b/libminifi/src/core/logging/LoggerConfiguration.cpp @@ -40,7 +40,7 @@ namespace minifi { namespace core { namespace logging { -const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%d %H:%M:%S.%e] [%n] [%l] %v"; +const char* LoggerConfiguration::spdlog_default_pattern = "[%Y-%m-%ll %H:%M:%S.%e] [%n] [%l] %v"; std::vector LoggerProperties::get_keys_of_type(const std::string &type) { std::vector appenders; diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp index 542d026da4..05fa6a8c8b 100644 --- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp +++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp @@ -109,18 +109,14 @@ void SiteToSiteProvenanceReportingTask::onSchedule(const std::shared_ptr &context, const std::shared_ptr &session) { logger_->log_debug("SiteToSiteProvenanceReportingTask -- onTrigger"); std::vector> records; - logger_->log_debug("batch size %d records", batch_size_); + logging::LOG_DEBUG(logger_) << "batch size " << batch_size_ << " records"; size_t deserialized = batch_size_; std::shared_ptr repo = context->getProvenanceRepository(); std::function()> constructor = []() {return std::make_shared();}; if (!repo->DeSerialize(records, deserialized, constructor) && deserialized == 0) { - logger_->log_debug("Not sending because deserialized is %d", deserialized); return; } - - logger_->log_debug("batch size %d records", batch_size_, deserialized); - - logger_->log_debug("Captured %d records", deserialized); + logging::LOG_DEBUG(logger_) << "Captured " << deserialized << " records"; std::string jsonStr; this->getJsonReport(context, session, records, jsonStr); if (jsonStr.length() <= 0) { diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp index 60adc1bf6a..7c9aad9f89 100644 --- a/libminifi/src/core/repository/VolatileContentRepository.cpp +++ b/libminifi/src/core/repository/VolatileContentRepository.cpp @@ -167,13 +167,13 @@ bool VolatileContentRepository::remove(const std::shared_ptrgetContentFullPath()); if (ptr->freeValue(claim)) { - logger_->log_debug("Remove for %s, reduced to %d", claim->getContentFullPath(), current_size_.load()); + logger_->log_debug("Removed %s", claim->getContentFullPath()); return true; } else { logger_->log_debug("free failed for %s", claim->getContentFullPath()); } } else { - logger_->log_debug("Could not remove for %s, size is %d", claim->getContentFullPath(), current_size_.load()); + logger_->log_debug("Could not remove %s", claim->getContentFullPath()); } } else { std::lock_guard lock(map_mutex_); diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 661cfe82ec..cc399fbbc7 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -18,6 +18,7 @@ #include "core/yaml/YamlConfiguration.h" #include +#include #include #include #include @@ -34,7 +35,7 @@ std::shared_ptr YamlConfiguration::id_generator_ = utils::Id core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { uuid_t uuid; - int64_t version = 0; + int32_t version = 0; checkRequiredField(&rootFlowNode, "name", CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); @@ -175,17 +176,17 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core:: // Take care of scheduling core::TimeUnit unit; if (core::Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { - logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod); + logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%ll] ns", schedulingPeriod); processor->setSchedulingPeriodNano(schedulingPeriod); } if (core::Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) && core::Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) { - logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod); + logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%ll] ms", penalizationPeriod); processor->setPenalizationPeriodMsec(penalizationPeriod); } if (core::Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) { - logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod); + logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%ll] ms", yieldPeriod); processor->setYieldPeriodMsec(yieldPeriod); } @@ -203,7 +204,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core:: logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy); } - int64_t maxConcurrentTasks; + int32_t maxConcurrentTasks; if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) { logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks); @@ -274,7 +275,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P logger_->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod); if (core::Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) && core::Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) { - logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue); + logger_->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%ll] ms", yieldPeriodValue); group->setYieldPeriodMsec(yieldPeriodValue); } } @@ -284,7 +285,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P logger_->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout); if (core::Property::StringToTime(timeout, timeoutValue, unit) && core::Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) { - logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue); + logger_->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%ll] ms", timeoutValue); group->setTimeOut(timeoutValue); } } @@ -348,7 +349,7 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor core::TimeUnit unit; if (core::Property::StringToTime(schedulingPeriodStr, schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { - logger_->log_debug("ProvenanceReportingTask schedulingPeriod %d ns", schedulingPeriod); + logger_->log_debug("ProvenanceReportingTask schedulingPeriod %ll ns", schedulingPeriod); processor->setSchedulingPeriodNano(schedulingPeriod); } @@ -367,7 +368,7 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor if (node["port"]) { auto portStr = node["port"].as(); if (core::Property::StringToInt(portStr, lvalue)) { - logger_->log_debug("ProvenanceReportingTask port %d", (uint16_t) lvalue); + logger_->log_debug("ProvenanceReportingTask port %ll", lvalue); reportTask->setPort((uint16_t) lvalue); } } @@ -474,20 +475,20 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P if (connectionNode["max work queue size"]) { auto max_work_queue_str = connectionNode["max work queue size"].as(); - int64_t max_work_queue_size = 0; + uint64_t max_work_queue_size = 0; if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) { connection->setMaxQueueSize(max_work_queue_size); } - logger_->log_debug("Setting %d as the max queue size for %s", max_work_queue_size, name); + logging::LOG_DEBUG(logger_) << "Setting " << max_work_queue_size << " as the max queue size for " << name; } if (connectionNode["max work queue data size"]) { auto max_work_queue_str = connectionNode["max work queue data size"].as(); - int64_t max_work_queue_data_size = 0; + uint64_t max_work_queue_data_size = 0; if (core::Property::StringToInt(max_work_queue_str, max_work_queue_data_size)) { connection->setMaxQueueDataSize(max_work_queue_data_size); } - logger_->log_debug("Setting %d as the max queue data size for %s", max_work_queue_data_size, name); + logging::LOG_DEBUG(logger_) << "Setting " << max_work_queue_data_size << " as the max queue data size for " << name; } if (connectionNode["source id"]) { @@ -618,7 +619,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup * if (inputPortsObj["max concurrent tasks"]) { auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as(); - int64_t maxConcurrentTasks; + int32_t maxConcurrentTasks; if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { processor->setMaxConcurrentTasks(maxConcurrentTasks); } diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index ac1690e6de..39fc982c5a 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -79,7 +79,7 @@ void Socket::closeStream() { addr_info_ = 0; } if (socket_file_descriptor_ >= 0) { - logger_->log_debug("Closing %d", socket_file_descriptor_); + logging::LOG_INFO(logger_) << "Closing " << socket_file_descriptor_; close(socket_file_descriptor_); socket_file_descriptor_ = -1; } diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp index abf2ce25b6..e903d95f25 100644 --- a/libminifi/src/io/FileStream.cpp +++ b/libminifi/src/io/FileStream.cpp @@ -142,7 +142,9 @@ int FileStream::readData(uint8_t *buf, int buflen) { size_t ret = len - offset_; offset_ = len; length_ = len; - logger_->log_info("%s eof bit, ended at %d", path_, offset_); + std::stringstream str; + str << path_ << " eof bit, ended at " << offset_; + logger_->log_info(str.str().c_str()); return ret; } else { offset_ += buflen; diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp index 622f30a90b..13d52c8029 100644 --- a/libminifi/src/processors/ExecuteProcess.cpp +++ b/libminifi/src/processors/ExecuteProcess.cpp @@ -27,6 +27,16 @@ #include "utils/StringUtils.h" #include "utils/TimeUtil.h" +#if defined(__clang__) +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wsign-compare" +#pragma clang diagnostic ignored "-Wunused-result" +#elif defined(__GNUC__) || defined(__GNUG__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wunused-result" +#endif + namespace org { namespace apache { namespace nifi { @@ -224,3 +234,9 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi } /* namespace nifi */ } /* namespace apache */ } /* namespace org */ + +#if defined(__clang__) +#pragma clang diagnostic pop +#elif defined(__GNUC__) || defined(__GNUG__) +#pragma GCC diagnostic pop +#endif diff --git a/libminifi/src/processors/GetTCP.cpp b/libminifi/src/processors/GetTCP.cpp index 215bb5e03e..bfb9a3c7ad 100644 --- a/libminifi/src/processors/GetTCP.cpp +++ b/libminifi/src/processors/GetTCP.cpp @@ -138,7 +138,7 @@ void GetTCP::onSchedule(const std::shared_ptr &context, co core::TimeUnit unit; if (core::Property::StringToTime(value, msec, unit) && core::Property::ConvertTimeUnitToMS(msec, unit, msec)) { reconnect_interval_ = msec; - logger_->log_debug("successfully applied reconnect interval of %d", reconnect_interval_); + logger_->log_debug("successfully applied reconnect interval of %ll", reconnect_interval_); } } else { reconnect_interval_ = 5000; @@ -187,7 +187,7 @@ void GetTCP::onSchedule(const std::shared_ptr &context, co socket_ptr->closeStream(); return -1; } - logger_->log_info("Sleeping for %d msec before attempting to reconnect", reconnect_interval_); + logger_->log_info("Sleeping for %ll msec before attempting to reconnect", reconnect_interval_); std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_)); socket_ring_buffer_.enqueue(std::move(socket_ptr)); } else { diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp index c67b357f89..62f819494a 100644 --- a/libminifi/src/processors/ListenHTTP.cpp +++ b/libminifi/src/processors/ListenHTTP.cpp @@ -225,7 +225,7 @@ void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) { bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) { auto req_info = mg_get_request_info(conn); - logger_->log_info("ListenHTTP handling POST request of length %d", req_info->content_length); + logger_->log_info("ListenHTTP handling POST request of length %ll", req_info->content_length); // If this is a two-way TLS connection, authorize the peer against the configured pattern if (req_info->is_ssl && req_info->client_cert != nullptr) { diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp index b0b47ecf34..490efaea10 100644 --- a/libminifi/src/processors/TailFile.cpp +++ b/libminifi/src/processors/TailFile.cpp @@ -40,6 +40,14 @@ #include "core/ProcessContext.h" #include "core/ProcessSession.h" +#if defined(__clang__) +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wsign-compare" +#elif defined(__GNUC__) || defined(__GNUG__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wsign-compare" +#endif + namespace org { namespace apache { namespace nifi { @@ -249,10 +257,10 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se char delim = this->_delimiter.c_str()[0]; std::vector> flowFiles; session->import(fullPath, flowFiles, true, this->_currentTailFilePosition, delim); - logger_->log_info("%d flowfiles were received from TailFile input", flowFiles.size()); + logger_->log_info("%ll flowfiles were received from TailFile input", flowFiles.size()); for (auto ffr : flowFiles) { - logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, ffr->getSize()); + logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, ffr->getSize()); std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + ffr->getSize()) + "." + extension; ffr->updateKeyedAttribute(PATH, fileLocation); ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath); @@ -270,7 +278,7 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath); session->import(fullPath, flowFile, true, this->_currentTailFilePosition); session->transfer(flowFile, Success); - logger_->log_info("TailFile %s for %d bytes", _currentTailFileName, flowFile->getSize()); + logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, flowFile->getSize()); std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension; flowFile->updateKeyedAttribute(FILENAME, logName); this->_currentTailFilePosition += flowFile->getSize(); @@ -287,3 +295,9 @@ void TailFile::onTrigger(core::ProcessContext *context, core::ProcessSession *se } /* namespace nifi */ } /* namespace apache */ } /* namespace org */ + +#if defined(__clang__) +#pragma clang diagnostic pop +#elif defined(__GNUC__) || defined(__GNUG__) +#pragma GCC diagnostic pop +#endif diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp index b46cbc07ef..1edb191daa 100644 --- a/libminifi/src/provenance/Provenance.cpp +++ b/libminifi/src/provenance/Provenance.cpp @@ -69,7 +69,7 @@ bool ProvenanceEventRecord::DeSerialize(const std::shared_ptrlog_error("NiFi Provenance Store event %s can not be found", uuidStr_); return false; } else { - logger_->log_debug("NiFi Provenance Read event %s length %d", uuidStr_, value.length()); + logger_->log_debug("NiFi Provenance Read event %s", uuidStr_); } org::apache::nifi::minifi::io::DataStream stream((const uint8_t*) value.data(), value.length()); @@ -77,9 +77,9 @@ bool ProvenanceEventRecord::DeSerialize(const std::shared_ptrlog_debug("NiFi Provenance retrieve event %s size %d eventType %d success", uuidStr_, stream.getSize(), _eventType); + logger_->log_debug("NiFi Provenance retrieve event %s size %llu eventType %d success", uuidStr_, stream.getSize(), _eventType); } else { - logger_->log_debug("NiFi Provenance retrieve event %s size %d eventType %d fail", uuidStr_, stream.getSize(), _eventType); + logger_->log_debug("NiFi Provenance retrieve event %s size %llu eventType %d fail", uuidStr_, stream.getSize(), _eventType); } return ret; @@ -220,7 +220,7 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptrSerialize(uuidStr_, const_cast(outStream.getBuffer()), outStream.getSize())) { - logger_->log_error("NiFi Provenance Store event %s size %d fail", uuidStr_, outStream.getSize()); + logger_->log_error("NiFi Provenance Store event %s size %llu fail", uuidStr_, outStream.getSize()); } return true; } diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp index 43c3157294..df063ff38b 100644 --- a/libminifi/src/sitetosite/RawSocketProtocol.cpp +++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp @@ -42,6 +42,42 @@ namespace sitetosite { std::shared_ptr RawSiteToSiteClient::id_generator_ = utils::IdGenerator::getIdGenerator(); std::shared_ptr Transaction::id_generator_ = utils::IdGenerator::getIdGenerator(); +const char *RawSiteToSiteClient::HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = { +/** + * Boolean value indicating whether or not the contents of a FlowFile should + * be GZipped when transferred. + */ +"GZIP", +/** + * The unique identifier of the port to communicate with + */ +"PORT_IDENTIFIER", +/** + * Indicates the number of milliseconds after the request was made that the + * client will wait for a response. If no response has been received by the + * time this value expires, the server can move on without attempting to + * service the request because the client will have already disconnected. + */ +"REQUEST_EXPIRATION_MILLIS", +/** + * The preferred number of FlowFiles that the server should send to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. + */ +"BATCH_COUNT", +/** + * The preferred number of bytes that the server should send to the client + * when pulling data. This property was introduced in version 5 of the + * protocol. + */ +"BATCH_SIZE", +/** + * The preferred amount of time that the server should send data to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. Value is in milliseconds. + */ +"BATCH_DURATION" }; + bool RawSiteToSiteClient::establish() { if (peer_state_ != IDLE) { logger_->log_error("Site2Site peer state is not idle while try to establish"); @@ -112,7 +148,9 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() { if (ret <= 0) { return false; } - logger_->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion); + + logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion; + for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) { if (serverVersion >= _supportedVersion[i]) { _currentVersion = _supportedVersion[i]; @@ -174,7 +212,8 @@ bool RawSiteToSiteClient::initiateCodecResourceNegotiation() { if (ret <= 0) { return false; } - logger_->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion); + logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion; + for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) { if (serverVersion >= _supportedCodecVersion[i]) { _currentCodecVersion = _supportedCodecVersion[i]; @@ -339,7 +378,9 @@ bool RawSiteToSiteClient::getPeerList(std::vector &peers) { } PeerStatus status(std::make_shared(port_id_, host, port, secure), count, true); peers.push_back(std::move(status)); - logger_->log_info("Site2Site Peer host %s, port %d, Secure %d", host, port, secure); + std::stringstream str; + str << "Site2Site Peer host " << host << " port " << port << " Secure " << secure; + logger_->log_info(str.str().c_str()); } tearDown(); @@ -354,7 +395,7 @@ int RawSiteToSiteClient::writeRequestType(RequestType type) { if (type >= MAX_REQUEST_TYPE) return -1; - return peer_->writeUTF(RequestTypeStr[type]); + return peer_->writeUTF(SiteToSiteRequest::RequestTypeStr[type]); } int RawSiteToSiteClient::readRequestType(RequestType &type) { @@ -366,7 +407,7 @@ int RawSiteToSiteClient::readRequestType(RequestType &type) { return ret; for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) { - if (RequestTypeStr[i] == requestTypeStr) { + if (SiteToSiteRequest::RequestTypeStr[i] == requestTypeStr) { type = (RequestType) i; return ret; } @@ -592,7 +633,7 @@ bool RawSiteToSiteClient::transmitPayload(const std::shared_ptrlog_info("Site2Site transaction %s send bytes length %d", transactionID.c_str(), payload.length()); + logging::LOG_INFO(logger_) << "Site2Site transaction " << transactionID << " sent bytes length" << payload.length(); if (!confirm(transactionID)) { throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); @@ -600,7 +641,7 @@ bool RawSiteToSiteClient::transmitPayload(const std::shared_ptrlog_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->current_transfers_, transaction->_bytes); + logging::LOG_INFO(logger_) << "Site2Site transaction " << transactionID << " successfully send flow record " << transaction->current_transfers_ << " content bytes " << transaction->_bytes; } catch (std::exception &exception) { if (transaction) deleteTransaction(transactionID); diff --git a/libminifi/src/sitetosite/SiteToSite.cpp b/libminifi/src/sitetosite/SiteToSite.cpp new file mode 100644 index 0000000000..7fdd09bd28 --- /dev/null +++ b/libminifi/src/sitetosite/SiteToSite.cpp @@ -0,0 +1,57 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "sitetosite/SiteToSite.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace sitetosite { + +const char *SiteToSiteRequest::RequestTypeStr[MAX_REQUEST_TYPE] = { "NEGOTIATE_FLOWFILE_CODEC", "REQUEST_PEER_LIST", "SEND_FLOWFILES", "RECEIVE_FLOWFILES", "SHUTDOWN" }; + +// Respond Code Context +RespondCodeContext SiteToSiteRequest::respondCodeContext[21] = { //NOLINT + { RESERVED, "Reserved for Future Use", false }, //NOLINT + { PROPERTIES_OK, "Properties OK", false }, //NOLINT + { UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true }, //NOLINT + { ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true }, //NOLINT + { MISSING_PROPERTY, "Missing Property", true }, //NOLINT + { CONTINUE_TRANSACTION, "Continue Transaction", false }, //NOLINT + { FINISH_TRANSACTION, "Finish Transaction", false }, //NOLINT + { CONFIRM_TRANSACTION, "Confirm Transaction", true }, //NOLINT + { TRANSACTION_FINISHED, "Transaction Finished", false }, //NOLINT + { TRANSACTION_FINISHED_BUT_DESTINATION_FULL, "Transaction Finished But Destination is Full", false }, //NOLINT + { CANCEL_TRANSACTION, "Cancel Transaction", true }, //NOLINT + { BAD_CHECKSUM, "Bad Checksum", false }, //NOLINT + { MORE_DATA, "More Data Exists", false }, //NOLINT + { NO_MORE_DATA, "No More Data Exists", false }, //NOLINT + { UNKNOWN_PORT, "Unknown Port", false }, //NOLINT + { PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true }, //NOLINT + { PORTS_DESTINATION_FULL, "Port's Destination is Full", false }, //NOLINT + { UNAUTHORIZED, "User Not Authorized", true }, //NOLINT + { ABORT, "Abort", true }, //NOLINT + { UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false }, //NOLINT + { END_OF_STREAM, "End of Stream", false } }; + +} /* namespace sitetosite */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp index 491e78b431..1f986551b2 100644 --- a/libminifi/src/sitetosite/SiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp @@ -29,7 +29,7 @@ int SiteToSiteClient::writeRequestType(RequestType type) { if (type >= MAX_REQUEST_TYPE) return -1; - return peer_->writeUTF(RequestTypeStr[type]); + return peer_->writeUTF(SiteToSiteRequest::RequestTypeStr[type]); } int SiteToSiteClient::readRequestType(RequestType &type) { @@ -41,7 +41,7 @@ int SiteToSiteClient::readRequestType(RequestType &type) { return ret; for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) { - if (RequestTypeStr[i] == requestTypeStr) { + if (SiteToSiteRequest::RequestTypeStr[i] == requestTypeStr) { type = (RequestType) i; return ret; } @@ -218,7 +218,7 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptrlog_debug("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes); + logger_->log_debug("Site2Site transaction %s successfully send flow record %d, content bytes %llu", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes); } catch (std::exception &exception) { if (transaction) deleteTransaction(transactionID); @@ -515,7 +515,7 @@ int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, co sitetosite::ReadCallback callback(packet); session->read(flowFile, &callback); if (flowFile->getSize() != packet->_size) { - logger_->log_debug("MisMatched sizes %d %d", flowFile->getSize(), packet->_size); + logger_->log_debug("MisMatched sizes %llu %llu", flowFile->getSize(), packet->_size); return -2; } } @@ -545,7 +545,7 @@ int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, co transaction->total_transfers_++; transaction->_state = DATA_EXCHANGED; transaction->_bytes += len; - logger_->log_debug("Site2Site transaction %s send flow record %d, total length %d, added %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes, len); + logger_->log_debug("Site2Site transaction %s send flow record %d, total length %llu, added %llu", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes, len); return 0; } @@ -618,13 +618,12 @@ bool SiteToSiteClient::receive(std::string transactionID, DataPacket *packet, bo // start to read the packet uint32_t numAttributes; ret = transaction->getStream().read(numAttributes); - logger_->log_debug("returning true/false because ret is %d %d", ret, numAttributes); if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) { return false; } // read the attributes - logger_->log_debug("Site2Site transaction %s receives attribute key %llu", transactionID.c_str(), numAttributes); + logger_->log_debug("Site2Site transaction %s receives attribute key %d", transactionID.c_str(), numAttributes); for (unsigned int i = 0; i < numAttributes; i++) { std::string key; std::string value; @@ -658,7 +657,7 @@ bool SiteToSiteClient::receive(std::string transactionID, DataPacket *packet, bo } transaction->_state = DATA_EXCHANGED; transaction->_bytes += len; - logger_->log_debug("Site2Site transaction %s receives flow record %d, total length %d, added %d", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes, len); + logger_->log_debug("Site2Site transaction %s receives flow record %d, total length %llu, added %llu", transactionID.c_str(), transaction->total_transfers_, transaction->_bytes, len); return true; } @@ -726,7 +725,7 @@ bool SiteToSiteClient::receiveFlowFiles(const std::shared_ptrgetSize() << " bytes, but actually sent " << packet._size; throw Exception(SITE2SITE_EXCEPTION, message.str().c_str()); } else { - logger_->log_debug("received %d with expected %d", flowFile->getSize(), packet._size); + logger_->log_debug("received %llu with expected %llu", flowFile->getSize(), packet._size); } } core::Relationship relation; // undefined relationship @@ -748,7 +747,7 @@ bool SiteToSiteClient::receiveFlowFiles(const std::shared_ptrlog_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d", transactionID.c_str(), transfers, bytes); + logger_->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %llu", transactionID.c_str(), transfers, bytes); // we yield the receive if we did not get anything if (transfers == 0) context->yield(); diff --git a/libminifi/src/utils/Id.cpp b/libminifi/src/utils/Id.cpp index 0c76a797f6..05e55a2ca5 100644 --- a/libminifi/src/utils/Id.cpp +++ b/libminifi/src/utils/Id.cpp @@ -26,7 +26,6 @@ #include #include #include - #include "core/logging/LoggerConfiguration.h" #include "utils/StringUtils.h" @@ -50,20 +49,20 @@ IdGenerator::IdGenerator() uint64_t IdGenerator::getDeviceSegmentFromString(const std::string& str, int numBits) { uint64_t deviceSegment = 0; - for (int i = 0; i < str.length(); i++) { + for (size_t i = 0; i < str.length(); i++) { unsigned char c = toupper(str[i]); if (c >= '0' && c <= '9') { deviceSegment = deviceSegment + (c - '0'); } else if (c >= 'A' && c <= 'F') { deviceSegment = deviceSegment + (c - 'A' + 10); } else { - logger_->log_error("Expected hex char (0-9, A-F). Got %c.", c); + logging::LOG_ERROR(logger_) << "Expected hex char (0-9, A-F). Got " << c; } deviceSegment = deviceSegment << 4; } deviceSegment <<= 64 - (4 * (str.length() + 1)); deviceSegment >>= 64 - numBits; - logger_->log_debug("Using user defined device segment: %" PRIx64, deviceSegment); + logging::LOG_DEBUG(logger_) << "Using user defined device segment: " << std::hex << deviceSegment; deviceSegment <<= 64 - numBits; return deviceSegment; } @@ -79,7 +78,7 @@ uint64_t IdGenerator::getRandomDeviceSegment(int numBits) { } } deviceSegment >>= 64 - numBits; - logger_->log_debug("Using random device segment: %" PRIx64, deviceSegment); + logging::LOG_DEBUG(logger_) << "Using random defined device segment:" << deviceSegment; deviceSegment <<= 64 - numBits; return deviceSegment; } @@ -90,13 +89,13 @@ void IdGenerator::initialize(const std::shared_ptr & properties) { if (properties->get("uid.implementation", implementation_str)) { std::transform(implementation_str.begin(), implementation_str.end(), implementation_str.begin(), ::tolower); if ("random" == implementation_str) { - logger_->log_debug("Using uuid_generate_random for uids."); + logging::LOG_DEBUG(logger_) << "Using uuid_generate_random for uids."; implementation_ = UUID_RANDOM_IMPL; } else if ("uuid_default" == implementation_str) { - logger_->log_debug("Using uuid_generate for uids."); + logging::LOG_DEBUG(logger_) << "Using uuid_generate for uids."; implementation_ = UUID_DEFAULT_IMPL; } else if ("minifi_uid" == implementation_str) { - logger_->log_debug("Using minifi uid implementation for uids"); + logging::LOG_DEBUG(logger_) << "Using minifi uid implementation for uids"; implementation_ = MINIFI_UID_IMPL; uint64_t timestamp = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); @@ -107,13 +106,13 @@ void IdGenerator::initialize(const std::shared_ptr & properties) { if (properties->get("uid.minifi.device.segment", device_segment)) { prefix = getDeviceSegmentFromString(device_segment, device_bits); } else { - logger_->log_warn("uid.minifi.device.segment not specified, generating random device segment"); + logging::LOG_WARN(logger_) << "uid.minifi.device.segment not specified, generating random device segment"; prefix = getRandomDeviceSegment(device_bits); } timestamp <<= device_bits; timestamp >>= device_bits; prefix = prefix + timestamp; - logger_->log_debug("Using minifi uid prefix: %16" PRIx64, prefix); + logging::LOG_DEBUG(logger_) << "Using minifi uid prefix: " << std::hex << prefix; } for (int i = 0; i < 8; i++) { unsigned char prefix_element = (prefix >> ((7 - i) * 8)) & UNSIGNED_CHAR_MAX; @@ -121,12 +120,12 @@ void IdGenerator::initialize(const std::shared_ptr & properties) { } incrementor_ = 0; } else if ("time" == implementation_str) { - logger_->log_debug("Using uuid_generate_time implementation for uids."); + logging::LOG_DEBUG(logger_) << "Using uuid_generate_time implementation for uids."; } else { - logger_->log_debug("Invalid value for uid.implementation (%s). Using uuid_generate_time implementation for uids.", implementation_str); + logging::LOG_DEBUG(logger_) << "Invalid value for uid.implementation (" << implementation_str << "). Using uuid_generate_time implementation for uids."; } } else { - logger_->log_debug("Using uuid_generate_time implementation for uids."); + logging::LOG_DEBUG(logger_) << "Using uuid_generate_time implementation for uids."; } } diff --git a/libminifi/test/unit/IdTests.cpp b/libminifi/test/unit/IdTests.cpp index fb3083548e..c60aedb5af 100644 --- a/libminifi/test/unit/IdTests.cpp +++ b/libminifi/test/unit/IdTests.cpp @@ -168,6 +168,6 @@ TEST_CASE("Test Hex Device Segment 18 bits", "[id]") { REQUIRE(128 == (uid[2] & 192)); REQUIRE(1 == uid[15]); - REQUIRE(true == LogTestController::getInstance().contains("Using minifi uid prefix: 9af8")); + REQUIRE(true == LogTestController::getInstance().contains("Using minifi uid prefix: 9af8")); LogTestController::getInstance().reset(); }