MINIFICPP-550 - Implement RocksDB controller service and component st…#605
MINIFICPP-550 - Implement RocksDB controller service and component st…#605bakaid wants to merge 24 commits intoapache:masterfrom
Conversation
|
Known parts that are missing or at least should be discussed are:
|
cee2125 to
1600e71
Compare
|
@bakaid I think this is a pretty important piece of functionality especially as it relates to dealing with state more gracefully on windows (CWEL needs for example ) -- is this ready for review? |
|
@phrocker I think it's important as well - I raised the questions in my previous comment here, because I would like input on them, but I think someone would have to see the PR before they could be answered. So yes, by all means, I would appreciate if someone could review this. |
There was a problem hiding this comment.
@bakaid You raise some good points. I think simplicity should reign. I'm in favor of defining this through the properties and not the config yml ( at least as a first step). We may get to that point but I think we should update the schema to get there.
As a result, we can define these via the minifi properties file. This will allow us to have modes for state removal. Reload, restart, and reference based are all reasonable. As a result I would not favore using KeyValueStoreServices as processor properties quite yet, but when that time comes our schema already supports a base class. Take for example
core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("[...])->isRequired(false)->asType<minifi::controllers::SSLContextService>()->build());
I assume that is what you mean by accepts-descendant of?
The last point is, in my opinion, easier to answer via focusing on controller services as a service based implementations for the flow and the agent itself. When considering the latter, the lifetime could persist beyond the flow, but we certainly need to be aware of this and clean up if say the flow changes and we no longer have references to those items. We have cleanup for flow files and provenance, so this reference counting isn't unheard of in the agent. The flow is an easier case where stopping these components should result in cleanup.
| void RocksDbPersistableKeyValueStoreService::notifyStop() { | ||
| AbstractAutoPersistingKeyValueStoreService::notifyStop(); | ||
|
|
||
| db_valid_ = false; |
There was a problem hiding this comment.
Would love some defensiveness here to ensure rogue extensions don't cause a memory error.
arpadboda
left a comment
There was a problem hiding this comment.
Let's resurrect this as it's good stuff!
extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
Show resolved
Hide resolved
| logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str()); | ||
| return false; | ||
| } | ||
| bool ListSFTP::updateFromTrackingTimestampsCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path) { |
There was a problem hiding this comment.
Why do we use shared_ptr const&? If we can guarantee that the pointed-to object stays alive while we have a reference to it, then a regular observer pointer (plain ptr or tagged observer_ptr) states the purpose better.
This type says: "I'm a pointer to a mutable ProcessContext with shared ownership that can be reassigned by someone else to point to another mutable ProcessContext with shared ownership."
There was a problem hiding this comment.
Agreed, we use it because onTrigger already gets it this way and it is easier to just pass that to these functions.
There was a problem hiding this comment.
Are you open to changing it to core::ProcessContext& wherever the API allows that? It would simplify the API, stop saying the wrong thing (my original comment), not allow for null (except in arcane cases) and make future "de-shared_ptr-ification" refactorings easier.
There was a problem hiding this comment.
I am not really open to in the context of this PR. This is preexisting code, and is used this way in many processors. If you want to do a round of these modifications to make easier further API changes, that makes sense, but should be a separate issue (and ticket).
extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
Show resolved
Hide resolved
libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
Show resolved
Hide resolved
| return controller_service_provider_->getControllerServiceName(identifier); | ||
| } | ||
|
|
||
| static constexpr char const* DefaultStateManagerProviderName = "defaultstatemanagerprovider"; |
There was a problem hiding this comment.
This line reminds me of stereotypical enterprise codebases. We might have too much abstraction here.
default- ok
state- stateless is generally better, but ok
manager- meaningless. Only suggests doing something, so it should probably be a verb. Having a noun in place of a verb usually means trying to use a class where a function would be better.
provider- again, a class instead of a function. This has a meaning though.
name- ok
I'm not saying that your code is bad, just that this is a red flag that suggests a design mistake that may or may not be in your changes.
There was a problem hiding this comment.
default- ok
state- stateless is generally better, but ok
This is a PR about creating a centralized state storage mechanism for processors that require state, instead of all of them implementing individual state files. "Stateless is generally better" is a mind-bogglingly meaningless comment here.
manager- meaningless. Only suggests doing something, so it should probably be a verb. Having a noun in place of a verb usually means trying to use a class where a function would be better.
StateManager comes from NiFi, and it is a perfectly meaningful name: it is an object that helps manage the state of a single CoreComponent. It cannot be a function as it contains some caching and validation logic that requires it in itself to be stateful.
provider- again, a class instead of a function. This has a meaning though.
CoreComponentStateManagerProvider needs to exist as interface, which is used by the ClassLoader and we access different implementations of it through that interface.
name- ok
I'm not saying that your code is bad, just that this is a red flag that suggests a design mistake that may or may not be in your changes.
I'm not saying your comment is useless, but it could be more useful if you tried to interpret the name in the context of the PR, instead of in itself.
There was a problem hiding this comment.
Context is always important when reviewing code. Most code has a history even if we don't like it, so it may be useful to appreciate where it comes from and how we intend to get it where we want it to go.
There was a problem hiding this comment.
Sorry for coming through as offensive. I lack a lot of contextual knowledge and at the time of writing the original comment, I knew even less.
I maintain my standpoint that there is most likely too much abstraction but now I understand that changing this would require redesign of significant parts of the project and it is harmful to do so without serious thought.
| namespace minifi { | ||
| namespace controllers { | ||
|
|
||
| class PersistableKeyValueStoreService : virtual public KeyValueStoreService, public AbstractCoreComponentStateManagerProvider { |
There was a problem hiding this comment.
I believe representing a combination of these capabilities via composition would be better design than representing them via inheritance.
There was a problem hiding this comment.
PersistableKeyValueStoreService is just an interface extension of KeyValueStoreService, and forcefully wrapping a PersistableKeyValueStoreService into some class that provides the CoreComponentStateManagerProvider interface after loading the PersistableKeyValueStoreService with the classloader would make it impossible to implement and use a CoreComponentStateManagerProvider directly.
This way everything "just works": if you implement a PersistableKeyValueStoreService you automatically have a CoreComponentStateManagerProvider, but you can implement a CoreComponentStateManagerProvider directly.
| std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider, | ||
| const std::string& id) | ||
| : provider_(std::move(provider)) | ||
| , id_(id) |
There was a problem hiding this comment.
I prefer pass-by-value and move. I'd use it for id as well, not only provider.
- It allows moving in values without extra overloads.
- It's
noexcept - It costs one extra move construction + destruction
| const char* key = "foobar"; | ||
| const char* value = "234"; | ||
| const char* new_value = "baz"; | ||
| REQUIRE(true == controller->set(key, value)); |
There was a problem hiding this comment.
Comparing a boolean expression to true is redundant.
There was a problem hiding this comment.
Yep, but in tests I think it is better readable.
|
@arpadboda @szaszm The PR is ready for review, with the following issues. Blocking the PR:
Follow-up issues:
All the processors that (to my knowledge) used state has been rewritten to use the new mechanism:
|
| } | ||
| enqueue_c2_response(std::move(response)); | ||
| } | ||
| } else if (resp.name == "corecomponentstate") { |
There was a problem hiding this comment.
This has only been tested manually. Once #743 is done and merged, I can probably write reasonable tests for it.
There was a problem hiding this comment.
Added C2DescribeCoreComponentStateTest for this.
| update_sink_->drainRepositories(); | ||
| C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); | ||
| enqueue_c2_response(std::move(response)); | ||
| } else if (resp.name == "corecomponentstate") { |
There was a problem hiding this comment.
This has only been written to make sure that it fits the architecture, it hasn't even been tested manually. Once #743 is done and merged, I can probably write reasonable tests for it.
There was a problem hiding this comment.
Could you add a TODO: untested comment to the top of the block?
There was a problem hiding this comment.
Let's revisit this after rebasing.
There was a problem hiding this comment.
This is unfortunately still very much non-trivial to do (unlike testing DESCRIBE, which I do now), so I've added a TODO for it as suggested.
arpadboda
left a comment
There was a problem hiding this comment.
Mostly looks good, added some comments.
Didn't finish reviewing, further comments/questions are expected.
|
|
||
| rocksdb::DB* db_; | ||
| rocksdb::WriteOptions default_write_options; | ||
| bool db_valid_; |
There was a problem hiding this comment.
Why do we need this? The pointer being null could indicate the db not being valid.
The documentation of rocksdb::open seems to fit in this aspect:
// Stores nullptr in *dbptr and returns a non-OK status on error.
(source: https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h )
There was a problem hiding this comment.
It was a remnant from the UnorderedMap version (which was implemented before this). Replaced along with using a unique_ptr.
| protected: | ||
| std::string directory_; | ||
|
|
||
| rocksdb::DB* db_; |
There was a problem hiding this comment.
Wonder if it could be a unique ptr
There was a problem hiding this comment.
None of our rocksdb stuff uses a unique_ptr, but it can, rewritten.
| delete db_; | ||
| } | ||
| rocksdb::Options options; | ||
| options.create_if_missing = true; |
There was a problem hiding this comment.
We will need the buffer options to be set here the same way as we did for FF repo, otherwise the persistent storage will slowly generate the same 64 megabyte peaks we just got rid of.
There was a problem hiding this comment.
Good point, this was written last summer, we didn't know this back then.
| } | ||
|
|
||
| bool RocksDbPersistableKeyValueStoreService::set(const std::string& key, const std::string& value) { | ||
| if (!db_valid_) { |
There was a problem hiding this comment.
Wonder if we could utilize the nullcheck macro here (used id nanofi)
There was a problem hiding this comment.
Replaced with unique_ptr, so no longer relevant.
| logger_->log_error("Could not found json state file path in Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str()); | ||
| std::unordered_map<std::string, std::string> state_map; | ||
| if (!state_manager_->get(state_map)) { | ||
| logger_->log_debug("Failed to get state from StateManager"); |
There was a problem hiding this comment.
Just debug?
Sometimes it's ok, but I wonder if it should be at least info.
There was a problem hiding this comment.
If the processor has no stored state yet, state_manager_->get will return with false.
It could be modified to return an empty map, but I think there is value in having a distinction between "we have already stored a state, we just have nothing to store" and "we have no stored state yet".
There was a problem hiding this comment.
That's completely reasonable, I put my focus on the error case: for eg. persistency is corrupted/deleted on the disk, so processors unexpectedly start from a clean state. Which worth at least an info level log message in my opinion.
Yes, I also expect the state manager to log something in such scenario, although loading persistency shouldn't happen too frequently (onschedule phase), so I don't mind being quite verbose here.
There was a problem hiding this comment.
If we can make a distinction of such cases AND (if the state storage is present, but corrupted, OR the state storage is not present, but expected), then I'd like to log a warning, not an info.
The current API doesn't seem to allow for such distinction and I'm not sure whether modifying it is worth it or even possible.
| logger_->log_error("No '!' in bookmarXml '%ls'", bookmarkXml.c_str()); | ||
| bookmarkXml.clear(); | ||
| return createEmptyBookmarkXmlFile(); | ||
| return false; |
There was a problem hiding this comment.
This seems to be misindented.
There was a problem hiding this comment.
Thanks, since these are Windows-only, I edited them in VS when migrating them, and well... this is the result.
There was a problem hiding this comment.
If you find a simple way to disable automatic reformatting in VS, I'm interested as well. I sometimes have to recreate my windows changes on my linux editor to avoid huge diffs due to automatic reformatting.
There are a handful of options, but no obvious (to me) way to disable messing with unchanged code.
| utils::file::FileUtils::concat_path(bookmarkRootDir, "uuid"), uuid), "Bookmark.txt"); | ||
|
|
||
| std::wstring bookmarkXml; | ||
| if (getBookmarkXmlFromFile(bookmarkXml)) { |
There was a problem hiding this comment.
Some spaces/tabs issue here and above in this functions as well
extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
Show resolved
Hide resolved
| namespace minifi { | ||
| namespace controllers { | ||
|
|
||
| class UnorderedMapPersistableKeyValueStoreService : public AbstractAutoPersistingKeyValueStoreService, |
There was a problem hiding this comment.
What's the motivation behind these class?
The implementation looks good, but I'm not sure if there is any value in having a maintaining a 3rd store service.
I wonder if we could utilise a very tiny and simple 3rd party kv storage for this purpose instead of having an own implementation.
The non-serialized makes perfect sense for "no footprint" usage, it can be configured together with volatile repos.
The rocksdb one allignes perfectly with the rocksdb repos.
This one stands somewhere in the middle, to ensure persistency in case of not having rocksdb, although I feel that we introduce a something we will have to support for long and it doesn't really add value.
There was a problem hiding this comment.
The motivation is twofold: to provide an implementation without any third party requirements, and to provide a serialized format which is human-readable and easily editable.
As for the first motivation: we don't want to make it a hard requirement to have rocksdb to use stateful processors, because that would hinder low-footprint usages, nor do I want to make any other third party a dependency for this: it would, again, introduce an unwanted requirement for low-footprint usages and we would have to find one that supports all target platforms, and maintain it, which is not a negligible maintenance burden.
For the second one: so far users could easily clear a state by deleting the state file or directory. With the rocksdb state storage this is no longer a trivial task. The C2 methods introduced here attempt to solve this problem, but until they are finalized (or if someone does not want to use, or can't use C2), they aren't really useful.
A solution for this is using this state storage method: its serialized format is trivial and editable in text format without any third party tools, making it easy to clear (or even modify) the state of processors.
Finally for the support argument: this is a trivial, and, if you take a look at PersistableKeyValueStoreServiceTest.cpp, well-tested implementation, that I don't think would introduce a significant maintenance burden going forward.
| add_test(NAME UnorderedMapPersistableKeyValueStoreServiceTest COMMAND PersistableKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/UnorderedMapPersistableKeyValueStoreServiceTest.yml") | ||
| if (NOT DISABLE_ROCKSDB) | ||
| add_test(NAME RocksdDbPersistableKeyValueStoreServiceTest COMMAND PersistableKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/RocksDbPersistableKeyValueStoreServiceTest.yml") | ||
| endif() No newline at end of file |
There was a problem hiding this comment.
missing line break at the end of the file
| if (state_dir == nullptr) { | ||
| char state_dir_name_template[] = "/tmp/teststate.XXXXXX"; | ||
| state_dir_ = utils::file::FileUtils::create_temp_directory(state_dir_name_template); | ||
| } else { | ||
| state_dir_ = state_dir; | ||
| } | ||
| state_manager_provider_ = core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_, configuration_, state_dir_.c_str()); |
There was a problem hiding this comment.
We should move the state dir to /var/tmp to avoid breaking the tests on tmpfs when using rocksdb storage, as described in MINIFICPP-1188.
| auto controller_service_provider = yaml_ptr->getControllerServiceProvider(); | ||
| char state_dir_name_template[] = "/tmp/integrationstate.XXXXXX"; | ||
| state_dir = utils::file::FileUtils::create_temp_directory(state_dir_name_template); | ||
| core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider, configuration, state_dir.c_str()); |
There was a problem hiding this comment.
We should move the state dir to /var/tmp to avoid breaking the tests on tmpfs when using rocksdb storage, as described in MINIFICPP-1188.
| logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str()); | ||
| return false; | ||
| } | ||
| bool ListSFTP::updateFromTrackingTimestampsCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path) { |
There was a problem hiding this comment.
Are you open to changing it to core::ProcessContext& wherever the API allows that? It would simplify the API, stop saying the wrong thing (my original comment), not allow for null (except in arcane cases) and make future "de-shared_ptr-ification" refactorings easier.
| try { | ||
| state_listing_timestamp = stoull(state_map.at("listing.timestamp")); | ||
| } catch (...) { | ||
| return false; | ||
| } | ||
| std::string key = line.substr(0, separator_pos); | ||
| std::string value = line.substr(separator_pos + 1); | ||
| if (key == "hostname") { | ||
| state_hostname = std::move(value); | ||
| } else if (key == "username") { | ||
| state_username = std::move(value); | ||
| } else if (key == "remote_path") { | ||
| state_remote_path = std::move(value); | ||
| } else if (key == "listing.timestamp") { | ||
| try { | ||
| state_listing_timestamp = stoull(value); | ||
| } catch (...) { | ||
| logger_->log_error("listing.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str()); | ||
| return false; | ||
| } | ||
| } else if (key == "processed.timestamp") { | ||
| try { | ||
| state_processed_timestamp = stoull(value); | ||
| } catch (...) { | ||
| logger_->log_error("processed.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str()); | ||
| return false; | ||
| try { | ||
| state_processed_timestamp = stoull(state_map.at("processed.timestamp")); | ||
| } catch (...) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
The inner catch-alls are redundant, since the outer one does the same: return false.
I'd prefer to have the log messages restored, which may make nested try blocks necessary again.
There was a problem hiding this comment.
Added individual logging.
| std::mt19937 gen(std::random_device { }()); | ||
| std::generate_n(line1.begin(), 4095, [&]() -> char { | ||
| return 32 + gen() % (127 - 32); | ||
| return 32 + gen() % (127 - 32); |
There was a problem hiding this comment.
The old indentation looks correct to me.
std::uniform_int_distribution{ 32, 126 } would be a more readable way of generating these characters, if you're interested in changing this piece of old code.
| std::lock_guard<std::mutex> lock(configuration_mutex_); | ||
|
|
||
| for (auto item : properties) { | ||
| properties_[item.getName()] = item; |
There was a problem hiding this comment.
We can avoid both copies of each property.
for (auto& item: properties) {
const auto name = item.getName();
properties_[name] = std::move(item);
}
There was a problem hiding this comment.
This function is a modified version of ConfigurableComponent::setSupportedProperties. Yes, the copies can be avoided both here and there, but it is completely inconsequential, as this happens only once per flow initialization.
| for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) { | ||
| parseStateFileLine(buf); | ||
|
|
||
| /* We could not get the state form the StateManager, try to migrate the old state file if it exists */ |
| AbstractCoreComponentStateManagerProvider::~AbstractCoreComponentStateManagerProvider() { | ||
| } |
There was a problem hiding this comment.
It's possible to use = default; in .cpp files as well.
| update_sink_->drainRepositories(); | ||
| C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true); | ||
| enqueue_c2_response(std::move(response)); | ||
| } else if (resp.name == "corecomponentstate") { |
There was a problem hiding this comment.
Could you add a TODO: untested comment to the top of the block?
|
@arpadboda @szaszm Rebased to latest master (including C2 refactor), added test for C2 DESCRIBE corecomponentstate, addressed review comments. |
|
I am done with my final verification, the following has been tested successfully:
|
|
Built and tested, seems to work based on my tests as well, so LGTM, merging this. |
…ate storage
Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.