From 149dd90ed423d54ea2be1bcd27da9fe875399112 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Fri, 12 May 2017 15:22:06 -0400 Subject: [PATCH 1/2] MINIFI-37: Create a volatile repository and config items for NoOp and Volatile repository configuration --- README.md | 12 + libminifi/include/core/Core.h | 4 +- libminifi/include/core/Repository.h | 15 +- libminifi/include/core/RepositoryFactory.h | 1 + libminifi/include/core/logging/BaseLogger.h | 3 +- .../core/repository/VolatileRepository.h | 350 ++++++++++++++++++ libminifi/src/FlowController.cpp | 6 +- libminifi/src/core/RepositoryFactory.cpp | 8 + .../core/repository/VolatileRepository.cpp | 63 ++++ libminifi/test/unit/ProvenanceTests.cpp | 85 +++++ main/MiNiFiMain.cpp | 4 + 11 files changed, 543 insertions(+), 8 deletions(-) create mode 100644 libminifi/include/core/repository/VolatileRepository.h create mode 100644 libminifi/src/core/repository/VolatileRepository.cpp diff --git a/README.md b/README.md index df89fb8c1f..1457350a30 100644 --- a/README.md +++ b/README.md @@ -299,7 +299,19 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc if you do not want to enable client certificate base authorization nifi.security.need.ClientAuth=false + +### Configuring Volatile and NO-OP Repositories + in minifi.properties + + # For Volatile Repositories: + nifi.flow.repository.class.name=VolatileRepository + nifi.provenance.repository.class.name=VolatileRepository + + # For NO-OP Repositories: + nifi.flow.repository.class.name=NoOpRepository + nifi.provenance.repository.class.name=NoOpRepository + ### Provenance Report Add Provenance Reporting to config.yml diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h index 453a6a5d5d..8953dfb31b 100644 --- a/libminifi/include/core/Core.h +++ b/libminifi/include/core/Core.h @@ -40,8 +40,10 @@ namespace core { template static inline std::string getClassName() { char *b = abi::__cxa_demangle(typeid(T).name(), 0, 0, 0); + if (b == nullptr) + return std::string(); std::string name = b; - delete[] b; + std::free(b); return name; } diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index 48ccc4753e..94517d8e07 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -47,14 +47,21 @@ namespace nifi { namespace minifi { namespace core { +#define REPOSITORY_DIRECTORY "./repo" +#define MAX_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M +#define MAX_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute +#define REPOSITORY_PURGE_PERIOD (2500) // 2500 msec + class Repository : public CoreComponent { public: /* * Constructor for the repository */ - Repository(std::string repo_name, std::string directory, - int64_t maxPartitionMillis, int64_t maxPartitionBytes, - uint64_t purgePeriod) + Repository(std::string repo_name="Repository", + std::string directory = REPOSITORY_DIRECTORY, + int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, + int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE, + uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) : CoreComponent(repo_name), thread_() { directory_ = directory; @@ -84,7 +91,7 @@ class Repository : public CoreComponent { } virtual bool Get(std::string key, std::string &value) { - return true; + return false; } // Run function for the thread diff --git a/libminifi/include/core/RepositoryFactory.h b/libminifi/include/core/RepositoryFactory.h index ed9a026941..3b3a2cee1f 100644 --- a/libminifi/include/core/RepositoryFactory.h +++ b/libminifi/include/core/RepositoryFactory.h @@ -20,6 +20,7 @@ #define LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ #include "core/Repository.h" +#include "core/repository/VolatileRepository.h" #include "Core.h" namespace org { diff --git a/libminifi/include/core/logging/BaseLogger.h b/libminifi/include/core/logging/BaseLogger.h index 9d00fb6e03..904bac21f7 100644 --- a/libminifi/include/core/logging/BaseLogger.h +++ b/libminifi/include/core/logging/BaseLogger.h @@ -54,7 +54,8 @@ typedef enum { template inline std::string format_string(char const* format_str, Args&&... args) { - char buf[LOG_BUFFER_SIZE]; + char buf[LOG_BUFFER_SIZE+1] = {0}; + std::snprintf(buf, LOG_BUFFER_SIZE, format_str, args...); return std::string(buf); } diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h new file mode 100644 index 0000000000..725fa4c3b0 --- /dev/null +++ b/libminifi/include/core/repository/VolatileRepository.h @@ -0,0 +1,350 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_ + +#include "core/Repository.h" +#include +#include +#include +#include "core/Core.h" +#include "Connection.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + + +static uint16_t accounting_size = sizeof(void*) * 2 + sizeof(uint64_t) + + sizeof(size_t); + +class RepoValue { + public: + + explicit RepoValue() + { + } + + explicit RepoValue(std::string key, uint8_t *ptr, size_t size) + : + key_(key) + { + buffer_.resize(size); + std::memcpy(buffer_.data(), ptr, size); + } + + explicit RepoValue(RepoValue &&other) noexcept + : key_(std::move(other.key_)), + buffer_(std::move(other.buffer_)){ + } + + ~RepoValue() + { + } + + + + std::string &getKey() { + return key_; + } + + /** + * Return the size of the memory within the key + * buffer, the size of timestamp, and the general + * system word size + */ + uint64_t size() { + return buffer_.size() + accounting_size; + } + + size_t bufferSize() { + return buffer_.size(); + } + + void emplace(std::string &str) { + str.insert(0, reinterpret_cast(buffer_.data()), buffer_.size()); + } + + RepoValue &operator=(RepoValue &&other) noexcept { + key_ = std::move(other.key_); + buffer_ = std::move(other.buffer_); + other.buffer_.clear(); + return *this; + } + + private: + std::string key_; + std::vector buffer_; +}; + +class AtomicEntry { + + public: + AtomicEntry() + : write_pending_(false), + has_value_(false) { + + } + + bool setRepoValue(RepoValue &new_value, size_t &prev_size) { + // delete the underlying pointer + bool lock = false; + if (!write_pending_.compare_exchange_weak(lock, true) && !lock) + return false; + if (has_value_) + { + prev_size = value_.size(); + } + value_ = std::move(new_value); + has_value_ = true; + try_unlock(); + return true; + } + + bool getValue(RepoValue &value) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + value = std::move(value_); + has_value_ = false; + try_unlock(); + return true; + } + + bool getValue(const std::string &key, RepoValue &value) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + if (value_.getKey() != key) { + try_unlock(); + return false; + } + value = std::move(value_); + has_value_ = false; + try_unlock(); + return true; + } + + private: + + inline void try_lock() { + bool lock = false; + while (!write_pending_.compare_exchange_weak(lock, true) && !lock) { + // attempt again + } + } + + inline void try_unlock() { + bool lock = true; + while (!write_pending_.compare_exchange_weak(lock, false) && lock) { + // attempt again + } + } + + std::atomic write_pending_; + std::atomic has_value_; + RepoValue value_; +}; + +/** + * Flow File repository + * Design: Extends Repository and implements the run function, using LevelDB as the primary substrate. + */ +class VolatileRepository : public core::Repository, + public std::enable_shared_from_this { + public: + // Constructor + + VolatileRepository(std::string dir = REPOSITORY_DIRECTORY, + int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, + int64_t maxPartitionBytes = + MAX_REPOSITORY_STORAGE_SIZE, + uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) + : Repository(core::getClassName(), "", + maxPartitionMillis, maxPartitionBytes, purgePeriod), + max_size_(maxPartitionBytes * 0.75), + current_index_(0), + max_count_(10000) + + { + + } + + // Destructor + ~VolatileRepository() { + for (auto ent : value_vector_) { + delete ent; + } + } + + // initialize + virtual bool initialize(const std::shared_ptr &configure) { +// configure->get("nifi.volatile.repository.max_size") + max_count_ = 10000; + value_vector_.reserve(max_count_); + for (int i = 0; i < max_count_; i++) { + value_vector_.emplace_back(new AtomicEntry()); + } + return true; + } + + virtual void run(); + + virtual bool Put(std::string key, uint8_t *buf, int bufLen) { + //if (exceedsCapacity(bufLen)) { + //purge(); + //} + RepoValue new_value(key, buf, bufLen); + + logger_->log_info("constructed"); + const size_t size = new_value.size(); + bool updated = false; + size_t reclaimed_size=0; + do { + + int private_index = current_index_.fetch_add(1); + // round robin through the beginning + if (private_index >= max_count_ ) { + uint16_t new_index = 0; + if (current_index_.compare_exchange_weak(new_index, 0)) { + private_index = 0; + } else { + continue; + } + } + logger_->log_info("Set repo value at %d",private_index); + updated = value_vector_.at(private_index)->setRepoValue( + new_value,reclaimed_size); + + if (reclaimed_size > 0) + { + current_size_-= reclaimed_size; + } + + + } while (!updated); + current_size_ += size; + + logger_->log_info("VolatileRepository -- put %s %d %d", key, + current_size_. + load(),current_index_.load()); + return true; + } + /** + *c + * Deletes the key + * @return status of the delete operation + */ + virtual bool Delete(std::string key) { + + logger_->log_info("VolatileRepository -- delete %s", key); + for (auto ent : value_vector_) { + // let the destructor do the cleanup + RepoValue value; + if (ent->getValue(key, value)) { + current_size_ -= value.size(); + return true; + } + + } + return false; + } + /** + * Sets the value from the provided key. Once the item is retrieved + * it may not be retrieved again. + * @return status of the get operation. + */ + virtual bool Get(std::string key, std::string &value) { + for (auto ent : value_vector_) { + // let the destructor do the cleanup + RepoValue repo_value; + + if (ent->getValue(key, repo_value)) { + current_size_ -= value.size(); + repo_value.emplace(value); + logger_->log_info("VolatileRepository -- get %s %d", key, + current_size_.load()); + return true; + } + + } + return false; + } + + void setConnectionMap( + std::map> &connectionMap) { + this->connectionMap = connectionMap; + } + void loadComponent(); + + void start() { + if (this->purge_period_ <= 0) + return; + if (running_) + return; + thread_ = std::thread(&VolatileRepository::run, shared_from_this()); + thread_.detach(); + running_ = true; + logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); + } + + protected: + + /** + * Tests whether or not the current size exceeds the capacity + * if the new prospectiveSize is inserted. + * @param prospectiveSize size of item to be added. + */ + inline bool exceedsCapacity(uint32_t prospectiveSize) { + if (current_size_ + prospectiveSize > max_size_) + return true; + else + return false; + } + /** + * Purges the volatile repository. + */ + void purge(); + + private: + std::map> connectionMap; + + std::atomic current_size_; + std::atomic current_index_; + std::vector value_vector_; + uint32_t max_count_; + uint32_t max_size_; + +} +; + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_ */ diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 5f6e01435d..c87875dd08 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -297,9 +297,11 @@ void FlowController::loadFlowRepo() { } logger_->log_debug("Number of connections from connectionMap %d", connectionMap.size()); - auto rep = std::static_pointer_cast( + auto rep = std::dynamic_pointer_cast( flow_file_repo_); - rep->setConnectionMap(connectionMap); + if (nullptr != rep) { + rep->setConnectionMap(connectionMap); + } flow_file_repo_->loadComponent(); } else { logger_->log_debug("Flow file repository is not set"); diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp index c24a2affd2..ddca87e928 100644 --- a/libminifi/src/core/RepositoryFactory.cpp +++ b/libminifi/src/core/RepositoryFactory.cpp @@ -24,6 +24,8 @@ #include "provenance/ProvenanceRepository.h" #endif +#include "core/repository/VolatileRepository.h" + namespace org { namespace apache { namespace nifi { @@ -48,9 +50,15 @@ std::shared_ptr createRepository( try { std::shared_ptr return_obj = nullptr; if (class_name_lc == "flowfilerepository") { + std::cout << "creating flow" << std::endl; return_obj = instantiate(); } else if (class_name_lc == "provenancerepository") { return_obj = instantiate(); + } else if (class_name_lc == "volatilerepository") { + return_obj = instantiate(); + } else if (class_name_lc == "nooprepository") { + std::cout << "creating noop" << std::endl; + return_obj = instantiate(); } if (return_obj) { diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp new file mode 100644 index 0000000000..2e7ff4cd41 --- /dev/null +++ b/libminifi/src/core/repository/VolatileRepository.cpp @@ -0,0 +1,63 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "core/repository/VolatileRepository.h" +#include +#include +#include +#include "FlowFileRecord.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +void VolatileRepository::run() { + repo_full_ = false; +} + +/** + * Purge + */ +void VolatileRepository::purge() { + while (current_size_ > max_size_) { + for (auto ent : value_vector_) { + // let the destructor do the cleanup + RepoValue value; + if (ent->getValue(value)) { + current_size_ -= value.size(); + logger_->log_info("VolatileRepository -- purge %s %d %d %d", + value.getKey(), current_size_.load(), max_size_, + current_index_.load()); + } + if (current_size_ < max_size_) + break; + } + } +} + +void VolatileRepository::loadComponent() { +} + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ diff --git a/libminifi/test/unit/ProvenanceTests.cpp b/libminifi/test/unit/ProvenanceTests.cpp index f5374b881b..d7a5524eba 100644 --- a/libminifi/test/unit/ProvenanceTests.cpp +++ b/libminifi/test/unit/ProvenanceTests.cpp @@ -27,6 +27,7 @@ #include "FlowFileRecord.h" #include "core/Core.h" #include "core/repository/FlowFileRepository.h" +#include "core/repository/VolatileRepository.h" TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") { provenance::ProvenanceEventRecord record1( @@ -93,3 +94,87 @@ TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") { record2.removeChildUuid(childId); REQUIRE(record2.getChildrenUuids().size() == 0); } + +TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { + + provenance::ProvenanceEventRecord record1( + provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", + "componenttype"); + + std::string eventId = record1.getEventId(); + + std::string smileyface = ":)"; + record1.setDetails(smileyface); + + uint64_t sample = 65555; + + std::shared_ptr testRepository =std::make_shared(); + testRepository->initialize(0); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + provenance::ProvenanceEventRecord record2; + REQUIRE(record2.DeSerialize(testRepository,eventId) == true); + REQUIRE(record2.getEventId() == record1.getEventId()); + REQUIRE(record2.getComponentId() == record1.getComponentId()); + REQUIRE(record2.getComponentType() == record1.getComponentType()); + REQUIRE(record2.getDetails() == record1.getDetails()); + REQUIRE(record2.getDetails() == smileyface); + REQUIRE(record2.getEventDuration() == sample); +} + +TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[TestFlowAndProv1]") { + + provenance::ProvenanceEventRecord record1( + provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", + "componenttype"); + std::string eventId = record1.getEventId(); + std::map attributes; + attributes.insert(std::pair("potato", "potatoe")); + attributes.insert(std::pair("tomato", "tomatoe")); + std::shared_ptr frepo =std::make_shared(); + frepo->initialize(0); + std::shared_ptr ffr1 = std::make_shared< + minifi::FlowFileRecord>(frepo,attributes); + + record1.addChildFlowFile(ffr1); + + uint64_t sample = 65555; + std::shared_ptr testRepository =std::make_shared(); + testRepository->initialize(0); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + provenance::ProvenanceEventRecord record2; + REQUIRE(record2.DeSerialize(testRepository,eventId) == true); + REQUIRE(record1.getChildrenUuids().size() == 1); + REQUIRE(record2.getChildrenUuids().size() == 1); + std::string childId = record2.getChildrenUuids().at(0); + REQUIRE(childId == ffr1->getUUIDStr()); + record2.removeChildUuid(childId); + REQUIRE(record2.getChildrenUuids().size() == 0); + +} + +TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { + + provenance::ProvenanceEventRecord record1( + provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", + "componenttype"); + + std::string eventId = record1.getEventId(); + + std::string smileyface = ":)"; + record1.setDetails(smileyface); + + uint64_t sample = 65555; + + std::shared_ptr testRepository =std::make_shared(); + testRepository->initialize(0); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + provenance::ProvenanceEventRecord record2; + REQUIRE(record2.DeSerialize(testRepository,eventId) == false); + +} diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index 3eb16aef66..4afb13370f 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -203,6 +203,10 @@ int main(int argc, char **argv) { * Trigger unload -- wait stop_wait_time */ controller->waitUnload(stop_wait_time); + + flow_repo = nullptr; + + prov_repo = nullptr; logger->log_info("MiNiFi exit"); From ecc40a67614089c46c2da2f8487941061361a6c8 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Mon, 22 May 2017 15:47:49 -0400 Subject: [PATCH 2/2] MINIFI-37: Allow the max count of the volatile repository size to be configurable --- libminifi/include/core/Core.h | 11 +- libminifi/include/core/RepositoryFactory.h | 2 +- .../core/repository/FlowFileRepository.h | 12 +- .../core/repository/VolatileRepository.h | 172 ++++++++++-------- libminifi/include/properties/Configure.h | 1 + .../include/provenance/ProvenanceRepository.h | 4 +- libminifi/src/Configure.cpp | 2 + libminifi/src/core/RepositoryFactory.cpp | 10 +- .../core/repository/VolatileRepository.cpp | 3 + libminifi/test/unit/ProvenanceTestHelper.h | 2 +- libminifi/test/unit/ProvenanceTests.cpp | 43 +++-- libminifi/test/unit/RepoTests.cpp | 9 +- main/MiNiFiMain.cpp | 4 +- 13 files changed, 152 insertions(+), 123 deletions(-) diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h index 8953dfb31b..335f306b7b 100644 --- a/libminifi/include/core/Core.h +++ b/libminifi/include/core/Core.h @@ -66,13 +66,18 @@ struct class_operations { }; template -typename std::enable_if::value, std::shared_ptr>::type instantiate() { +typename std::enable_if::value, std::shared_ptr>::type instantiate(const std::string name = "") { throw std::runtime_error("Cannot instantiate class"); } template -typename std::enable_if::value, std::shared_ptr>::type instantiate() { - return std::make_shared(); +typename std::enable_if::value, std::shared_ptr>::type instantiate(const std::string name = "") { + if (name.length() == 0){ + return std::make_shared(); + } + else{ + return std::make_shared(name); + } } /** diff --git a/libminifi/include/core/RepositoryFactory.h b/libminifi/include/core/RepositoryFactory.h index 3b3a2cee1f..db474a0fbc 100644 --- a/libminifi/include/core/RepositoryFactory.h +++ b/libminifi/include/core/RepositoryFactory.h @@ -31,7 +31,7 @@ namespace minifi { namespace core { std::shared_ptr createRepository( - const std::string configuration_class_name, bool fail_safe = false); + const std::string configuration_class_name, bool fail_safe = false,const std::string repo_name = ""); } /* namespace core */ } /* namespace minifi */ diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h index 9fc13e0972..051dfb0db8 100644 --- a/libminifi/include/core/repository/FlowFileRepository.h +++ b/libminifi/include/core/repository/FlowFileRepository.h @@ -47,21 +47,15 @@ class FlowFileRepository : public core::Repository, public: // Constructor - FlowFileRepository(std::string directory, int64_t maxPartitionMillis, - int64_t maxPartitionBytes, uint64_t purgePeriod) - : Repository(core::getClassName(), directory, + FlowFileRepository(const std::string repo_name = "", std::string directory=FLOWFILE_REPOSITORY_DIRECTORY, int64_t maxPartitionMillis=MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, + int64_t maxPartitionBytes=MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod=FLOWFILE_REPOSITORY_PURGE_PERIOD) + : Repository(repo_name.length() > 0 ? repo_name : core::getClassName(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod) { db_ = NULL; } - FlowFileRepository() - : FlowFileRepository(FLOWFILE_REPOSITORY_DIRECTORY, - MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, - MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, - FLOWFILE_REPOSITORY_PURGE_PERIOD) { - } // Destructor ~FlowFileRepository() { diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h index 725fa4c3b0..1e07e280f4 100644 --- a/libminifi/include/core/repository/VolatileRepository.h +++ b/libminifi/include/core/repository/VolatileRepository.h @@ -24,6 +24,7 @@ #include #include "core/Core.h" #include "Connection.h" +#include "utils/StringUtils.h" namespace org { namespace apache { @@ -32,69 +33,70 @@ namespace minifi { namespace core { namespace repository { - -static uint16_t accounting_size = sizeof(void*) * 2 + sizeof(uint64_t) - + sizeof(size_t); +static uint16_t accounting_size = sizeof(std::vector) + + sizeof(std::string) + sizeof(size_t); class RepoValue { public: - explicit RepoValue() - { + explicit RepoValue() { } explicit RepoValue(std::string key, uint8_t *ptr, size_t size) - : - key_(key) - { - buffer_.resize(size); + : key_(key) { + buffer_.resize(size); std::memcpy(buffer_.data(), ptr, size); + fast_size_ = key.size() + size; } - explicit RepoValue(RepoValue &&other) noexcept - : key_(std::move(other.key_)), - buffer_(std::move(other.buffer_)){ - } - - ~RepoValue() - { - } + explicit RepoValue(RepoValue &&other) +noexcept : key_(std::move(other.key_)), + buffer_(std::move(other.buffer_)), + fast_size_(other.fast_size_) { + } - + ~RepoValue() + { + } - std::string &getKey() { - return key_; - } + std::string &getKey() { + return key_; + } - /** - * Return the size of the memory within the key - * buffer, the size of timestamp, and the general - * system word size - */ - uint64_t size() { - return buffer_.size() + accounting_size; - } + /** + * Return the size of the memory within the key + * buffer, the size of timestamp, and the general + * system word size + */ + uint64_t size() { + return fast_size_; + } - size_t bufferSize() { - return buffer_.size(); - } + size_t bufferSize() { + return buffer_.size(); + } - void emplace(std::string &str) { - str.insert(0, reinterpret_cast(buffer_.data()), buffer_.size()); - } + void emplace(std::string &str) { + str.insert(0, reinterpret_cast(buffer_.data()), buffer_.size()); + } - RepoValue &operator=(RepoValue &&other) noexcept { - key_ = std::move(other.key_); - buffer_ = std::move(other.buffer_); - other.buffer_.clear(); - return *this; - } + RepoValue &operator=(RepoValue &&other) noexcept { + key_ = std::move(other.key_); + buffer_ = std::move(other.buffer_); + other.buffer_.clear(); + return *this; + } - private: - std::string key_; - std::vector buffer_; -}; + private: + size_t fast_size_; + std::string key_; + std::vector buffer_; + }; + /** + * Purpose: Atomic Entry allows us to create a statically + * sized ring buffer, with the ability to create + **/ class AtomicEntry { public: @@ -109,8 +111,7 @@ class AtomicEntry { bool lock = false; if (!write_pending_.compare_exchange_weak(lock, true) && !lock) return false; - if (has_value_) - { + if (has_value_) { prev_size = value_.size(); } value_ = std::move(new_value); @@ -175,21 +176,26 @@ class AtomicEntry { class VolatileRepository : public core::Repository, public std::enable_shared_from_this { public: + + static const char *volatile_repo_max_count; // Constructor - VolatileRepository(std::string dir = REPOSITORY_DIRECTORY, - int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, - int64_t maxPartitionBytes = - MAX_REPOSITORY_STORAGE_SIZE, - uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) - : Repository(core::getClassName(), "", - maxPartitionMillis, maxPartitionBytes, purgePeriod), + VolatileRepository( + std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, + int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, + int64_t maxPartitionBytes = + MAX_REPOSITORY_STORAGE_SIZE, + uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) + : Repository( + repo_name.length() > 0 ? + repo_name : core::getClassName(), + "", maxPartitionMillis, maxPartitionBytes, purgePeriod), max_size_(maxPartitionBytes * 0.75), current_index_(0), max_count_(10000) { - + } // Destructor @@ -199,10 +205,27 @@ class VolatileRepository : public core::Repository, } } - // initialize + /** + * Initialize thevolatile repsitory + **/ virtual bool initialize(const std::shared_ptr &configure) { -// configure->get("nifi.volatile.repository.max_size") - max_count_ = 10000; + std::string value = ""; + + if (configure != nullptr) { + int64_t max_cnt = 0; + std::stringstream strstream; + strstream << Configure::nifi_volatile_repository_options << getName() + << "." << volatile_repo_max_count; + if (configure->get(strstream.str(), value)) { + if (core::Property::StringToInt(value, max_cnt)) { + max_count_ = max_cnt; + } + + } + } + + logger_->log_debug("Resizing value_vector_ for %s count is %d", getName(), + max_count_); value_vector_.reserve(max_count_); for (int i = 0; i < max_count_; i++) { value_vector_.emplace_back(new AtomicEntry()); @@ -212,21 +235,22 @@ class VolatileRepository : public core::Repository, virtual void run(); + /** + * Places a new object into the volatile memory area + * @param key key to add to the repository + * @param buf buffer + **/ virtual bool Put(std::string key, uint8_t *buf, int bufLen) { - //if (exceedsCapacity(bufLen)) { - //purge(); - //} RepoValue new_value(key, buf, bufLen); - logger_->log_info("constructed"); const size_t size = new_value.size(); bool updated = false; - size_t reclaimed_size=0; - do { + size_t reclaimed_size = 0; + do { int private_index = current_index_.fetch_add(1); // round robin through the beginning - if (private_index >= max_count_ ) { + if (private_index >= max_count_) { uint16_t new_index = 0; if (current_index_.compare_exchange_weak(new_index, 0)) { private_index = 0; @@ -234,22 +258,20 @@ class VolatileRepository : public core::Repository, continue; } } - logger_->log_info("Set repo value at %d",private_index); - updated = value_vector_.at(private_index)->setRepoValue( - new_value,reclaimed_size); - - if (reclaimed_size > 0) - { - current_size_-= reclaimed_size; + logger_->log_info("Set repo value at %d out of %d", private_index, + max_count_); + updated = value_vector_.at(private_index)->setRepoValue(new_value, + reclaimed_size); + + if (reclaimed_size > 0) { + current_size_ -= reclaimed_size; } - } while (!updated); current_size_ += size; logger_->log_info("VolatileRepository -- put %s %d %d", key, - current_size_. - load(),current_index_.load()); + current_size_.load(), current_index_.load()); return true; } /** diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 4119edd8bc..2d5f2936bc 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -49,6 +49,7 @@ class Configure { static const char *nifi_server_name; static const char *nifi_configuration_class_name; static const char *nifi_flow_repository_class_name; + static const char *nifi_volatile_repository_options; static const char *nifi_provenance_repository_class_name; static const char *nifi_server_port; static const char *nifi_server_report_interval; diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h index af613a5b29..9e055f9dab 100644 --- a/libminifi/include/provenance/ProvenanceRepository.h +++ b/libminifi/include/provenance/ProvenanceRepository.h @@ -42,12 +42,12 @@ class ProvenanceRepository : public core::Repository, /*! * Create a new provenance repository */ - ProvenanceRepository(std::string directory = PROVENANCE_DIRECTORY, + ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY, int64_t maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD) - : Repository(core::getClassName(), directory, + : Repository(repo_name.length() > 0 ? repo_name : core::getClassName(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod) { db_ = NULL; diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index f35e88a7e1..c1524a2ec1 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -41,6 +41,8 @@ const char *Configure::nifi_configuration_class_name = "nifi.flow.configuration.class.name"; const char *Configure::nifi_flow_repository_class_name = "nifi.flow.repository.class.name"; +const char *Configure::nifi_volatile_repository_options = + "nifi.volatile.repository.options."; const char *Configure::nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name"; const char *Configure::nifi_server_port = "nifi.server.port"; diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp index ddca87e928..45ad9804c4 100644 --- a/libminifi/src/core/RepositoryFactory.cpp +++ b/libminifi/src/core/RepositoryFactory.cpp @@ -42,7 +42,7 @@ class FlowFileRepository; #endif std::shared_ptr createRepository( - const std::string configuration_class_name, bool fail_safe) { + const std::string configuration_class_name, bool fail_safe, const std::string repo_name) { std::shared_ptr return_obj = nullptr; std::string class_name_lc = configuration_class_name; std::transform(class_name_lc.begin(), class_name_lc.end(), @@ -51,14 +51,14 @@ std::shared_ptr createRepository( std::shared_ptr return_obj = nullptr; if (class_name_lc == "flowfilerepository") { std::cout << "creating flow" << std::endl; - return_obj = instantiate(); + return_obj = instantiate(repo_name); } else if (class_name_lc == "provenancerepository") { - return_obj = instantiate(); + return_obj = instantiate(repo_name); } else if (class_name_lc == "volatilerepository") { - return_obj = instantiate(); + return_obj = instantiate(repo_name); } else if (class_name_lc == "nooprepository") { std::cout << "creating noop" << std::endl; - return_obj = instantiate(); + return_obj = instantiate(repo_name); } if (return_obj) { diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp index 2e7ff4cd41..db036f80f5 100644 --- a/libminifi/src/core/repository/VolatileRepository.cpp +++ b/libminifi/src/core/repository/VolatileRepository.cpp @@ -28,6 +28,9 @@ namespace minifi { namespace core { namespace repository { +const char *VolatileRepository::volatile_repo_max_count = + "max.count"; + void VolatileRepository::run() { repo_full_ = false; } diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 9dbff36a15..585c0d32c9 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -93,7 +93,7 @@ class TestRepository : public core::Repository { class TestFlowRepository : public core::repository::FlowFileRepository { public: TestFlowRepository() - : core::repository::FlowFileRepository("./dir", 1000, 100, 0) { + : core::repository::FlowFileRepository("ff","./dir", 1000, 100, 0) { } // initialize bool initialize() { diff --git a/libminifi/test/unit/ProvenanceTests.cpp b/libminifi/test/unit/ProvenanceTests.cpp index d7a5524eba..993fe58372 100644 --- a/libminifi/test/unit/ProvenanceTests.cpp +++ b/libminifi/test/unit/ProvenanceTests.cpp @@ -73,7 +73,7 @@ TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") { attributes.insert(std::pair("tomato", "tomatoe")); std::shared_ptr frepo = std::make_shared( - "./content_repository", 0, 0, 0); + "ff", "./content_repository", 0, 0, 0); std::shared_ptr ffr1 = std::make_shared< minifi::FlowFileRecord>(frepo, attributes); @@ -96,10 +96,9 @@ TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") { } TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { - provenance::ProvenanceEventRecord record1( - provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", - "componenttype"); + provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, + "componentid", "componenttype"); std::string eventId = record1.getEventId(); @@ -108,13 +107,14 @@ TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::Pro uint64_t sample = 65555; - std::shared_ptr testRepository =std::make_shared(); + std::shared_ptr testRepository = std::make_shared< + core::repository::VolatileRepository>(); testRepository->initialize(0); record1.setEventDuration(sample); record1.Serialize(testRepository); provenance::ProvenanceEventRecord record2; - REQUIRE(record2.DeSerialize(testRepository,eventId) == true); + REQUIRE(record2.DeSerialize(testRepository, eventId) == true); REQUIRE(record2.getEventId() == record1.getEventId()); REQUIRE(record2.getComponentId() == record1.getComponentId()); REQUIRE(record2.getComponentType() == record1.getComponentType()); @@ -124,43 +124,42 @@ TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::Pro } TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[TestFlowAndProv1]") { - provenance::ProvenanceEventRecord record1( - provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", - "componenttype"); + provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, + "componentid", "componenttype"); std::string eventId = record1.getEventId(); std::map attributes; attributes.insert(std::pair("potato", "potatoe")); attributes.insert(std::pair("tomato", "tomatoe")); - std::shared_ptr frepo =std::make_shared(); + std::shared_ptr frepo = std::make_shared< + core::repository::VolatileRepository>(); frepo->initialize(0); std::shared_ptr ffr1 = std::make_shared< - minifi::FlowFileRecord>(frepo,attributes); + minifi::FlowFileRecord>(frepo, attributes); record1.addChildFlowFile(ffr1); - uint64_t sample = 65555; - std::shared_ptr testRepository =std::make_shared(); - testRepository->initialize(0); + uint64_t sample = 65555; + std::shared_ptr testRepository = std::make_shared< + core::repository::VolatileRepository>(); + testRepository->initialize(0); record1.setEventDuration(sample); record1.Serialize(testRepository); provenance::ProvenanceEventRecord record2; - REQUIRE(record2.DeSerialize(testRepository,eventId) == true); + REQUIRE(record2.DeSerialize(testRepository, eventId) == true); REQUIRE(record1.getChildrenUuids().size() == 1); REQUIRE(record2.getChildrenUuids().size() == 1); std::string childId = record2.getChildrenUuids().at(0); REQUIRE(childId == ffr1->getUUIDStr()); record2.removeChildUuid(childId); REQUIRE(record2.getChildrenUuids().size() == 0); - } TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { - provenance::ProvenanceEventRecord record1( - provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", - "componenttype"); + provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, + "componentid", "componenttype"); std::string eventId = record1.getEventId(); @@ -169,12 +168,12 @@ TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::Provena uint64_t sample = 65555; - std::shared_ptr testRepository =std::make_shared(); + std::shared_ptr testRepository = std::make_shared< + core::Repository>(); testRepository->initialize(0); record1.setEventDuration(sample); record1.Serialize(testRepository); provenance::ProvenanceEventRecord record2; - REQUIRE(record2.DeSerialize(testRepository,eventId) == false); - + REQUIRE(record2.DeSerialize(testRepository, eventId) == false); } diff --git a/libminifi/test/unit/RepoTests.cpp b/libminifi/test/unit/RepoTests.cpp index 4b6c4ad39b..9d220307f4 100644 --- a/libminifi/test/unit/RepoTests.cpp +++ b/libminifi/test/unit/RepoTests.cpp @@ -31,7 +31,8 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") { char format[] = "/tmp/testRepo.XXXXXX"; char *dir = testController.createTempDirectory(format); std::shared_ptr repository = - std::make_shared(dir, 0, 0, 1); + std::make_shared("ff", dir, 0, 0, + 1); repository->initialize(std::make_shared()); @@ -51,7 +52,8 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") { char format[] = "/tmp/testRepo.XXXXXX"; char *dir = testController.createTempDirectory(format); std::shared_ptr repository = - std::make_shared(dir, 0, 0, 1); + std::make_shared("ff", dir, 0, 0, + 1); repository->initialize(std::make_shared()); @@ -73,7 +75,8 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") { char format[] = "/tmp/testRepo.XXXXXX"; char *dir = testController.createTempDirectory(format); std::shared_ptr repository = - std::make_shared(dir, 0, 0, 1); + std::make_shared("ff", dir, 0, 0, + 1); repository->initialize( std::make_shared()); diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index 4afb13370f..5944ff2378 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -158,14 +158,14 @@ int main(int argc, char **argv) { prov_repo_class); // Create repos for flow record and provenance std::shared_ptr prov_repo = core::createRepository( - prov_repo_class, true); + prov_repo_class, true,"provenance"); prov_repo->initialize(configure); configure->get(minifi::Configure::nifi_flow_repository_class_name, flow_repo_class); std::shared_ptr flow_repo = core::createRepository( - flow_repo_class, true); + flow_repo_class, true, "flowfile"); flow_repo->initialize(configure);