NIFI-11294 - Add support for component state as checkpointing strategy#7076
NIFI-11294 - Add support for component state as checkpointing strategy#7076malthe wants to merge 14 commits intoapache:mainfrom
Conversation
|
Note that the existing tests (which I have adapted, somewhat unsuccessfully) are not very good – they do not actually exercise the processor's logic end-to-end, but only the "meat". I think the whole test setup for this processor really needs to be reworked entirely. |
|
Will review... |
exceptionfactory
left a comment
There was a problem hiding this comment.
Thanks for implementing this feature @malthe! Just noting that the automated builds flagged a couple Checkstyle violations, and the unit tests failed due to unnecessary stubbing. That usually means there are unnecessary mock calls that need to be removed or adjusted so the when expectations are only called when needed.
|
@malthe Thanks for working on this feature, it would be a valuable addition in I tried out the processor and also had a look at the code. There is an essential issue with the new component state strategy: it does not handle the partition ownership. Also the processor cannot be stopped cleanly but throws NullPointerExceptions in a loop after Stop. Please fix these first. |
|
@exceptionfactory thanks – these issues have been fixed now. I have updated the tests in adef599 so that they're exercising the public API (i.e. |
...essors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/CheckpointStrategy.java
Show resolved
Hide resolved
|
@turcsanyip I have now implemented proper load-balancing in a9bd297 – and to fix the issues with stopping the processor, I have removed most of the processor state fields, shifting them into a event batch processing closure instead. However, what remains at this point to be tested is the component-state checkpoint store in isolation. |
|
Thanks @malthe ! |
There was a problem hiding this comment.
Thanks for the updates on this new feature @malthe! I noted several test implementation concerns, and a couple recommendations on the implementation itself.
| public void testReceiveOne() { | ||
| setProperties(); | ||
| testRunner.run(1, false); | ||
| @EnumSource(CheckpointStrategy.class) |
There was a problem hiding this comment.
Changing all of these test methods to run for both checkpoint strategy is unnecessary and results in unnecessary processing. Recommend creating a couple separate methods instead.
| @BeforeEach | ||
| public void setupProcessor() { | ||
| processor = new MockConsumeAzureEventHub(); | ||
| processor = spy(new MockConsumeAzureEventHub()); |
There was a problem hiding this comment.
The MockConsumeAzureEventHub Processor already isolates certain behavior for testing, so introducing a spy() creates and additional level of indirection. Is there a particular reason for this approach as opposed to other alternatives?
There was a problem hiding this comment.
Previously, the processor wasn't tested through the onTrigger but through some internal methods.
I changed the tests so that all testing happens through the actually exposed interface. However, this requires some additional mocking in order to implement the checkpoint strategy for example.
(The reason being, we don't have the luxury of testing against a real event hub here.)
But instead of spying on the processor, I suppose I could implement some alternative methods on the mock class. I don't remember if I tried that and faced some issues perhaps.
| EVENT_HUB_NAME, | ||
| CONSUMER_GROUP | ||
| ).blockFirst(); | ||
| assert listed.getETag().equals(claimed.getETag()); |
There was a problem hiding this comment.
The assert keyword is not intended for testing. All instances should be replaces with assertEquals() or other applicable JUnit 5 assert methods.
| new ComponentStateCheckpointStore( | ||
| identifier, | ||
| new ComponentStateCheckpointStore.State() { | ||
| public StateMap getState() throws IOException { | ||
| return session.getState(Scope.CLUSTER); | ||
| } | ||
|
|
||
| public boolean replaceState(StateMap oldValue, Map<String, String> newValue) { | ||
| return false; | ||
| } | ||
| } | ||
| ), |
There was a problem hiding this comment.
The inner anonymous class results in multiple levels of nesting that make this hard to follow. Recommend breaking it out to an explicit class.
| } else { | ||
| checkpointStore = new ComponentStateCheckpointStore( | ||
| identifier, | ||
| new ComponentStateCheckpointStore.State() { |
There was a problem hiding this comment.
Having two different anonymous inner class implementations make this hard to follow. Recommend breaking this out to a distinct class.
|
|
||
| final Map<String, String> newState = new HashMap<>(oldState.toMap()); | ||
| long timestamp = System.currentTimeMillis(); | ||
| String eTag = identifier + "/" + timestamp; |
There was a problem hiding this comment.
Should the eTag be created based on the state version as opposed the current timestamp? That seems like it would align better with the purpose of the tag for tracking ownership and ensuring consistent version control across nodes.
There was a problem hiding this comment.
I'm not sure – but that logic was copied from the blob-based checkpoint store.
In fact, whenever I could, I have used the exact same logic between them.
| final String key = entry.getKey(); | ||
| final String[] parts = key.split("/", 5); | ||
| if (parts.length != 5) { | ||
| throw new ProcessException( | ||
| String.format("Invalid %s key: %s", kind, entry.getKey()) | ||
| ); | ||
| } | ||
| if (!parts[0].equals(kind)) { | ||
| continue; | ||
| } | ||
| final String fullyQualifiedNamespace = parts[1]; | ||
| final String eventHubName = parts[2]; | ||
| final String consumerGroup = parts[3]; | ||
| final String partitionId = parts[4]; | ||
| PartitionContext partitionContext = new PartitionContext( | ||
| fullyQualifiedNamespace, | ||
| eventHubName, | ||
| consumerGroup, | ||
| partitionId | ||
| ); |
There was a problem hiding this comment.
Recommend pulling this logic out to a separate method so the formatting approach is clear.
There was a problem hiding this comment.
@malthe I tried out the processor but it seems to process all messages again and again after restarts. I cannot see the checkpoint data saved in the state (only ownership) so I think this is the root cause. Could you please check it?
Could you please also rebase your changes onto the current main branch because there are some conflicting changes?
Thanks
| AZURE_BLOB_STORAGE("azure-blob-storage", "Use Azure Blob Storage to store partition checkpoints and ownership"), | ||
| COMPONENT_STATE("component-state", "Use component state to store partition checkpoints"); |
There was a problem hiding this comment.
Could you please use "user friendly" labels like Component State on the UI?
...processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
Outdated
Show resolved
Hide resolved
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | ||
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) | ||
| .required(true) | ||
| .dependsOn(CHECKPOINT_STRATEGY, CheckpointStrategy.AZURE_BLOB_STORAGE) |
There was a problem hiding this comment.
The other Storage * properties below should also depend on CheckpointStrategy.AZURE_BLOB_STORAGE.
Storage Account Key property can also be mandatory as it will be shown only when it is required.
|
|
||
| public boolean replaceState(StateMap oldValue, Map<String, String> newValue) throws IOException { | ||
| final ProcessSession session = processSessionFactory.createSession(); | ||
| if (!session.replaceState(oldValue, newValue, Scope.CLUSTER)) { |
There was a problem hiding this comment.
ProcessSession.replaceState() does not provide fully proper optimistic locking because the state can be changed by another session between replaceState() and commit(). So even if replaceState() returns true, the state change may be omitted at commit time due to a concurrent update.
I suggest using ProcessContext.getStateManager().replace() instead which has the right optimistic locking semantics which is required from a checkpoint store implementation.
Please note ProcessContext.getStateManager().replace() cannot initialize the state currently (see also NIFI-11595) so it needs to be created with setState() before using replace(). E.g. in @OnScheduled like this:
@OnScheduled
public void onScheduled(ProcessContext context) throws IOException {
if (getNodeTypeProvider().isPrimary()) {
final StateManager stateManager = context.getStateManager();
final StateMap state = stateManager.getState(Scope.CLUSTER);
if (!state.getStateVersion().isPresent()) {
stateManager.setState(new HashMap<>(), Scope.CLUSTER);
}
}
}
Co-authored-by: Peter Turcsanyi <35004384+turcsanyip@users.noreply.github.com>
|
@malthe Just a heads-up that NIFI-11595 has been merged fixing the Considering that there was no activity on the PR in the past months, I would like to ask if you are still interested in this feature? Do you intend to finish the PR? I'm happy to jump in and continue the implementation based on your PR if you have other interests now. Thanks |
|
@turcsanyip please do – I am currently not actively using NiFi so just following along from the sideline. |
|
Thanks @malthe! I'll go ahead and try to finish the implementation. |
|
Thanks for your work on this @malthe, and thanks for planning to take this on @turcsanyip. I am closing the pull request for now given the state of several conflicts, but feel free to open a new one when it is ready for review. |
Summary
NIFI-11294 ConsumeAzureEventHub should default to processor state for checkpointing
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000Pull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
mvn clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation