Skip to content

NIFI-14283 ConsumeKinesisStream non-static workerLock#9737

Merged
exceptionfactory merged 1 commit intoapache:mainfrom
dariuszseweryn:NIFI-14283
Feb 24, 2025
Merged

NIFI-14283 ConsumeKinesisStream non-static workerLock#9737
exceptionfactory merged 1 commit intoapache:mainfrom
dariuszseweryn:NIFI-14283

Conversation

@dariuszseweryn
Copy link
Contributor

Summary

NIFI-14283

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 21

Licensing

  • (there are none) New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • (there are none) New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • (no changes) Documentation formatting appears as expected in rendered files

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking at improving this behavior @dariuszseweryn.

Although moving from a static object to an instance variable is a step forward, it seems like this is worth some additional evaluation. For example, should the Scheduler thread be created in the onScheduled method, but only started in onTrigger? It may also be more efficient to use an atomic variable, as opposed to the synchronized lock object approach.

@dariuszseweryn
Copy link
Contributor Author

For example, should the Scheduler thread be created in the onScheduled method, but only started in onTrigger?

This is possible but has a downside. Scheduler needs a record processor which uses a ProcessSessionFactory that is available only in onTrigger(). The processor won't get called before the thread the Scheduler runs on is started, yet it would introduce additional state in the record processor which is definitely a downside.

The question is: is the advantage bigger than the disadvantage?
imho not given the creation is fairly infrequent

It may also be more efficient to use an atomic variable, as opposed to the synchronized lock object approach.

I may lack experience with NiFi but it seems to me that start and stop events are fairly infrequent for processors — this indicates there will be little contention especially when the lock is inherent to a processor instance, no?

If so, having an AtomicBoolean or other Atomic* will only unnecessary complicate the code.

@exceptionfactory
Copy link
Contributor

Thanks for the reply!

For example, should the Scheduler thread be created in the onScheduled method, but only started in onTrigger?

This is possible but has a downside. Scheduler needs a record processor which uses a ProcessSessionFactory that is available only in onTrigger(). The processor won't get called before the thread the Scheduler runs on is started, yet it would introduce additional state in the record processor which is definitely a downside.

Good point regarding the ProcessSessionFactory. That actually highlights a larger concern in that having the Processor hold on to the Session Factory in this way is generally poor practice and should be revisited.

The question is: is the advantage bigger than the disadvantage? imho not given the creation is fairly infrequent

It may also be more efficient to use an atomic variable, as opposed to the synchronized lock object approach.

I may lack experience with NiFi but it seems to me that start and stop events are fairly infrequent for processors — this indicates there will be little contention especially when the lock is inherent to a processor instance, no?

If so, having an AtomicBoolean or other Atomic* will only unnecessary complicate the code.

Starting and stopping is less frequent, but the framework calls onTrigger() very frequently, thus inefficient nature of the synchronized block in a method that the framework calls repeatedly.

@dariuszseweryn
Copy link
Contributor Author

dariuszseweryn commented Feb 21, 2025

That actually highlights a larger concern in that having the Processor hold on to the Session Factory in this way is generally poor practice and should be revisited.

Given the Scheduler produces records at arbitrary moments we can:

  1. On each onTrigger() substitute the factory in the processor regularly
  2. Buffer the records and submit them only onTrigger() — this would need to have manual checkpointing, not sure yet how cumbersome will it be

The problem here is that the processor is an adapter between two opinionated frameworks. The adapter can be made at the expense of complexity. This begs a question – besides ugliness – what are the downsides of holding to the Session Factory?

Starting and stopping is less frequent, but the framework calls onTrigger() very frequently, thus inefficient nature of the synchronized block in a method that the framework calls repeatedly.

The synchronization is only used if there is no Scheduler created yet, it uses (searches notes) double-checked locking to avoid the synchronization overhead

@dariuszseweryn
Copy link
Contributor Author

@exceptionfactory I have broken out the Session Factory issue into a separate JIRA: NIFI-14292

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for discussing the options @dariuszseweryn. Tracking the more substantive changes in a separate Jira issue sounds good. As this pull request is a useful incremental adjustment, I will proceed to merge and we can consider the larger questions under the new Jira issue you created.

@exceptionfactory exceptionfactory merged commit 96eea40 into apache:main Feb 24, 2025
6 checks passed
@dariuszseweryn dariuszseweryn deleted the NIFI-14283 branch June 6, 2025 09:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants