Skip to content

NIFI-2892 Implement AWS Kinesis Stream Get Processor#4822

Closed
ChrisSamo632 wants to merge 15 commits intoapache:mainfrom
ChrisSamo632:NIFI-2892
Closed

NIFI-2892 Implement AWS Kinesis Stream Get Processor#4822
ChrisSamo632 wants to merge 15 commits intoapache:mainfrom
ChrisSamo632:NIFI-2892

Conversation

@ChrisSamo632
Copy link
Contributor

@ChrisSamo632 ChrisSamo632 commented Feb 11, 2021

Thank you for submitting a contribution to Apache NiFi.

Please provide a short description of the PR here:

Description of PR

Enables fetching data from AWS Kinesis Data Streams.

Builds upon the original PR from @udaykale (and comments there by @turcsanyip and @jaynpearl).

Based largely upon the AWS Kinesis Consumer example classes.

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically main)?

  • Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not squash or use --force when pushing to allow for clean monitoring of changes.

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • Have you verified that the full build is successful on JDK 8?
  • Have you verified that the full build is successful on JDK 11?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • [ ] Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.

@bhaveshpatelh
Copy link

Thanks! Looking forward to this piece going out

@ChrisSamo632 ChrisSamo632 force-pushed the NIFI-2892 branch 3 times, most recently from 5d26754 to 8231dc3 Compare February 17, 2021 19:15
@auyer
Copy link

auyer commented Mar 3, 2021

Hi ! I'm interested in testing this out.
I currently run Apache Nifi using the base Docker images. How can I proceed to include the Kinesis Get Processor into my images?
Thanks ! Hope I can help!

@ChrisSamo632
Copy link
Contributor Author

Hi ! I'm interested in testing this out.
I currently run Apache Nifi using the base Docker images. How can I proceed to include the Kinesis Get Processor into my images?
Thanks ! Hope I can help!

@auyer I think you'd need to download the source from my branch, build it locally and then copy the new NARs into your image in the lib/ folder (overwrite the existing AWS NARs) - you can probably build just the AWS NARs (rather than the whole of NiFi) to speed up the process... if you build the whole of NiFi, you might as well just run the compiled version natively rather than using the Docker Image.

So, something like:

# clone the code
git clone https://github.com/ChrisSamo632/nifi.git
git checkout NIFI-2892

# build the code
cd nifi/nifi-nar-bundles/nifi-aws-bundle
mvn -T 2.0C clean install

# get the NARs
find . -name "*.nar" -exec {} /tmp

Copy these into a custom Docker Image with something like the following:

FROM apache/nifi:1.13.0

RUN rm lib/nifi-aws*.nar

COPY [ "*.nar", "lib/" ]

Then run the custom image in the way you normally would.

Note that I've not tested the above, so you may need to correct the instructions as you go - when developing NiFi, I just build from source and run natively, I've not got to the point of trying to copy the into a Docker container yet although that is how I run NiFi normally.

@auyer
Copy link

auyer commented Mar 10, 2021

I've been testing this for a few days, and had no issues so far. I should say that I'm not running NiFi in production yet, but plan to do so soon, and this feature helps a lot !
If anyone else wants to try it in Docker, I've built a image following the instructions above, and published it publicly in DockeHub.

Note: I will remove this repo after this PR gets into the next release.

@bhaveshpatelh
Copy link

@ChrisSamo632 Any timelines for merging/releasing it out?
We have a use-case and wanted to use it in NiFi production.

@ChrisSamo632
Copy link
Contributor Author

ChrisSamo632 commented Mar 23, 2021

@ChrisSamo632 Any timelines for merging/releasing it out?
We have a use-case and wanted to use it in NiFi production.

@bhaveshpatelh it's ready to go as far as I'm concerned, but it needs someone to review and then a committer to merge it (there's a big backlog of PRs, I've no idea when anyone will get to this... much as I'd like it in too because we want to use it in production)

@pvillard31
Copy link
Contributor

@ChrisSamo632 - out of curiosity, did you consider the addition of a record reader/writer like with ConsumeKafkaRecord processors? At the moment, as far as I can tell, we would have one record per flow file. Using the records abstraction would provide options such as schema validation, format transformation, as well as having multiple records in one flow file (which is greatly improving the performances in case there is a high number of messages per second).

@ChrisSamo632
Copy link
Contributor Author

ChrisSamo632 commented Mar 23, 2021

@ChrisSamo632 - out of curiosity, did you consider the addition of a record reader/writer like with ConsumeKafkaRecord processors? At the moment, as far as I can tell, we would have one record per flow file. Using the records abstraction would provide options such as schema validation, format transformation, as well as having multiple records in one flow file (which is greatly improving the performances in case there is a high number of messages per second).

@pvillard31 think I considered this in our original Slack conversation (or was that with @bbende?), but thought I'd leave out record writers at the minute for simplicity and to better understand how the Kinesis message (vs. Record) structure works

With the KCL worker and multi-threaded consumer approach taken by the KCL library, we'd need a way of combining the records in the processor too (how do we combine records from different consumers? How about where the consumers are reading from different shards and/or multiple consumers from the same shard?)

So I thought a record writer may be a sensible extension (fully agree it would be good to include from a nifi perspective) once there's more understanding of how people use the processor and how it works with different kinesis setups (so far I've only really tested it with simple streams)

@turcsanyip
Copy link
Contributor

turcsanyip commented Mar 26, 2021

@ChrisSamo632 Thanks for picking up this task and implementing this non-trivial processor!

I did not review the code in detail (yet) but spotted some issues regarding thread handling:

The onTrigger() method should not block its thread. In NiFi, there is a shared thread pool for executing all processors' onTrigger() methods. If some processors held up execution threads, it could lead to starvation of other processors.
I think a similar pattern could be used as in case of ConsumeAzureEventHub: start up the Worker at the first execution of onTrigger() and then just yield in the subsequent calls.

I believe one Worker per processor should be enough and it is not necessary to maintain a pool of workers. A single Worker can run multiple threads for executing the Kinesis RecordProcessors. As far as I saw, the Worker spins up a thread for each assigned shard by default (RecordProcessor-xxxx threads). So the parallel processing is provided with one Worker too. The code is simpler in this way and there is less overhead at runtime (each Worker has its own "maintenance" threads like LeaseRenewer-xxxx, LeaseCoordinator-xxxx).

The processor cannot stop cleanly due to a bug in KCL. Interestingly, it has just been reported to AWS by someone else: awslabs/amazon-kinesis-client#796
When the processor shuts down the Worker, the Worker leaves behind a thread running. Stopping/starting the processor multiple times would lead to thread leaking. Furthermore, the "zombie" thread(s) prevent NiFi to shut down properly:
2021-03-26 12:12:52,715 WARN [main] org.apache.nifi.bootstrap.Command NiFi has not finished shutting down after 20 seconds. Killing process.
For this reason, I think we need to downgrade the KCL version to 1.13.3 until the bug is fixed.

And an idea: the way the processor receives data via KCL is quite similar to other Consume*** processors (like the already mentioned ConsumeAzureEventHub or ConsumeAMQP), so I would consider to name the processor ConsumeKinesisStream instead of Get~ (actually there is a Get-like/polling API for Kinesis too and another processor may be implemented later using that API).

@ChrisSamo632
Copy link
Contributor Author

@turcsanyip thanks for the comments. I'd been a bit confused/concerned about the thread handling and tried (but failed) to find a suitable example, what you suggest looks like a sensible approach on first inspection - I'll try to make the changes sooner than later

Good spot with the KCL issue, will try the downgrade as you suggest (unless a fix comes from AWS in the meantime)

@pvillard31 I'll look again at using Record Reader/Writer at the same time in a fashion similar to the processors suggested above (although I may still suggest it as a future improvement ticket)

@ChrisSamo632
Copy link
Contributor Author

ChrisSamo632 commented Mar 27, 2021

@turcsanyip started to have a look at making these changes. Using KCL 1.13.3 instead of 1.14.x appears to be straight forward (i.e. no big API changes).

The refactoring to use a single Worker per processor and yield once the Worker has been setup looks fairly straight forward and I agree a simpler implementation except that if I am to follow the ConsumeAzureEventHub approach and use the ProcessSessionFactory version of onTrigger, I have to extend the AbstractSessionFactoryProcessor base class, but the existing AWS processors have a fairly long chain that extends from AbstractProcessor - I don't want to unmarry this processor from all the existing AWS processors and moving all AWS processors to use the different base class feels like a big change and not something we'd want to do.

Is there a straight forward way around this do you think? OIne concern I had with the original implementation was the fact that I was holding on to a single ProcessSession and comitting it multiple times (i.e. after every set of messages had been processed by the KCL RecordProcessor) - using the ProcessSessionFactory approach to create a new session every time a new set of Kinesis messages are received would seem better... aside for the above issue of the different abstract processors.

I'll look again at this further, but any guidance is welcome!

@pvillard31 having looked again at the Record processing now I've the Azure processor to compare with, I think includiung them here should be fairly straight forward (famous last words)

EDIT: the Session Factory problem can be sorted by changing the base abstract AWS processor to extend AbstractSessionFactoryProcessor and copying the AbstractProcessor methods into there (but not making the overridden onTrigger method as final) - this retains the same functionality for most AWS processors but allows the ConsumeKinesisStream processor access to the ProcessSessionFactory. An akhenaten would be to remove the final modifier from the AbstractProcessor onTrigger that accepts the factory (but that impacts nearly all processors and the method is presumably final for good reason)

@ChrisSamo632 ChrisSamo632 force-pushed the NIFI-2892 branch 2 times, most recently from 5a063c6 to 58d89e8 Compare March 29, 2021 20:19
@ChrisSamo632
Copy link
Contributor Author

@turcsanyip updated per your suggestions.

Note that this includes:

  • added (optional) Record Reader/Writer (@pvillard31 FYI)
  • better Thread/Session handling for the use of Kinesis Client Library
  • reverted to KCL verison 1.13.3 due to amazon-kinesis-client#796
  • removed double-validation of dynamic properties (introduced by NIFI-8266 @mattyb149 FYI in case you see an issue with my change to AbstractConfigurableComponent)
  • change to AbstractProcessor to remove the final modifier of the onTrigger method that takes the ProcessorSessionFactory - by default processors extending this class will still need to implement the onTrigger method that takes the ProcessSession, but can now optionally override the former (this seemed a better option in the end than copying the contents of AbstractProcessor into AbstractAWSProcessor, but happy for that to be challenged)

@bhaveshpatelh
Copy link

@ChrisSamo632 When I tried following commands to build the processor, it's throwing an error.
can you guide me with the next steps?

# clone the code
git clone https://github.com/ChrisSamo632/nifi.git
git checkout NIFI-2892

# build the code
cd nifi/nifi-nar-bundles/nifi-aws-bundle
mvn -T 2.0C clean install
[INFO] -------------------------------------------------------------
[WARNING] COMPILATION WARNING : 
[INFO] -------------------------------------------------------------
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/factory/strategies/AnonymousCredentialsStrategy.java:[26,29] [deprecation] StaticCredentialsProvider in com.amazonaws.internal has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCEncryptionStrategy.java:[22,32] [deprecation] AmazonS3EncryptionClient in com.amazonaws.services.s3 has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java:[24,32] [deprecation] AmazonS3EncryptionClient in com.amazonaws.services.s3 has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java:[25,38] [deprecation] CryptoConfiguration in com.amazonaws.services.s3.model has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/factory/strategies/AccessKeyPairCredentialsStrategy.java:[26,29] [deprecation] StaticCredentialsProvider in com.amazonaws.internal has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java:[272,73] [deprecation] getHeaders() in HttpResponse has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/factory/strategies/AnonymousCredentialsStrategy.java:[44,19] [deprecation] StaticCredentialsProvider in com.amazonaws.internal has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/factory/strategies/AssumeRoleCredentialsStrategy.java:[163,55] [deprecation] AWSSecurityTokenServiceClient(AWSCredentialsProvider,ClientConfiguration) in AWSSecurityTokenServiceClient has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/factory/strategies/AssumeRoleCredentialsStrategy.java:[165,32] [deprecation] setEndpoint(String) in AWSSecurityTokenService has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCEncryptionStrategy.java:[59,8] [deprecation] AmazonS3EncryptionClient in com.amazonaws.services.s3 has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCEncryptionStrategy.java:[59,46] [deprecation] AmazonS3EncryptionClient in com.amazonaws.services.s3 has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCEncryptionStrategy.java:[59,42] [deprecation] AmazonS3EncryptionClient(AWSCredentialsProvider,EncryptionMaterialsProvider) in AmazonS3EncryptionClient has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java:[251,15] [deprecation] AmazonCloudWatchClient(AWSCredentialsProvider,ClientConfiguration) in AmazonCloudWatchClient has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java:[51,8] [deprecation] CryptoConfiguration in com.amazonaws.services.s3.model has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java:[51,47] [deprecation] CryptoConfiguration in com.amazonaws.services.s3.model has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java:[57,8] [deprecation] AmazonS3EncryptionClient in com.amazonaws.services.s3 has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java:[57,46] [deprecation] AmazonS3EncryptionClient in com.amazonaws.services.s3 has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java:[57,42] [deprecation] AmazonS3EncryptionClient(AWSCredentialsProvider,EncryptionMaterialsProvider,CryptoConfiguration) in AmazonS3EncryptionClient has been deprecated
[WARNING] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/factory/strategies/AccessKeyPairCredentialsStrategy.java:[49,19] [deprecation] StaticCredentialsProvider in com.amazonaws.internal has been deprecated
[INFO] 19 warnings 
[INFO] -------------------------------------------------------------
[INFO] -------------------------------------------------------------
[ERROR] COMPILATION ERROR : 
[INFO] -------------------------------------------------------------
[ERROR] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java:[486,16] error: onTrigger(ProcessContext,ProcessSessionFactory) in ConsumeKinesisStream cannot override onTrigger(ProcessContext,ProcessSessionFactory) in AbstractProcessor
  overridden method is final
[INFO] 1 error
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for nifi-aws-bundle 1.14.0-SNAPSHOT:
[INFO] 
[INFO] nifi-aws-bundle .................................... SUCCESS [  2.902 s]
[INFO] nifi-aws-service-api ............................... SUCCESS [  5.103 s]
[INFO] nifi-aws-abstract-processors ....................... SUCCESS [  8.258 s]
[INFO] nifi-aws-processors ................................ FAILURE [ 30.808 s]
[INFO] nifi-aws-service-api-nar ........................... SUCCESS [ 15.162 s]
[INFO] nifi-aws-nar ....................................... SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  50.014 s (Wall Clock)
[INFO] Finished at: 2021-04-13T06:27:18Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project nifi-aws-processors: Compilation failure
[ERROR] /home/ubuntu/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java:[486,16] error: onTrigger(ProcessContext,ProcessSessionFactory) in ConsumeKinesisStream cannot override onTrigger(ProcessContext,ProcessSessionFactory) in AbstractProcessor
[ERROR]   overridden method is final
[ERROR] 
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
[ERROR] 
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <goals> -rf :nifi-aws-processors

@ChrisSamo632
Copy link
Contributor Author

ChrisSamo632 commented Apr 13, 2021

@bhaveshpatelh the AbstractProcessor class has been changed as part of this PR (see earlier comments), so you need to build that from this branch in order for the AWS bundle to compile.

The easiest (but probably not fastest) thing is probably to just build the whole of nifi. For a faster build, you could try:

cd nifi-api
mvn clean install

cd ../nifi-nar-bundles/nifi-aws-bundle
mvn -T 2.0C clean install

(Note: I've not tried this, so no guarantees it will work)

@pvillard31
Copy link
Contributor

Hey @ChrisSamo632 - It seems there are some build issues with the unit tests.

@ChrisSamo632
Copy link
Contributor Author

@ChrisSamo632 It seems to me that the checkpoint at shutdown does not work. I have a few uncheckpointed messages that get processed again and again when I restart the processor.

I don't really understand how the graceful shutdown works in KCL, but it seems to me we have no chance to exit cleanly (I am always seeing ZOMBIE state in the record processor's shutdown callback).
Could you please double check it?

...

@turcsanyip I'd noticed ZOMBIE logs a few times and thought it seemed a "normal" thing for KCL (from what I could tell, but note I'm hardly a Kinesis/KCL expert). Taking a look around on the internet, there are many threads about people seeing this kind of behaviour in many different versions of the KCL, for example:

A common thing between these seems to be that the KCL settings likely need tweaking depending upon one's setup. With the ability to configure most KCL settings via Dynamic Properties on the processor, it's largely open for users to figure this out I guess depending upon their setup.

That said, a common theme seems to be the need to increase the failoverTimeMillis and/or gracefulShutdownMillis KCL settings, I've therefore added those to the ConsumeKinesisStream processor's properties to encourage users to investigate/tweak as required.

Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChrisSamo632 NIFI-8431 (Redundant validation of Dynamic Properties) has been merged. Could you please rebase your PR onto main and update AbstractConfigurableComponent according to that?

Please also find my comments regarding tests.

@ChrisSamo632
Copy link
Contributor Author

@ChrisSamo632 NIFI-8431 (Redundant validation of Dynamic Properties) has been merged. Could you please rebase your PR onto main and update AbstractConfigurableComponent according to that?

Please also find my comments regarding tests.

@turcsanyip yep I'd planned to as soon as I could after the PR that replicated my fix was merged (earlier today). Will include a part of the next commit

udaykale and others added 13 commits May 11, 2021 09:42
…sisStream; improve Session and Thread handling when initialising Kinesis Client Library Worker and processing Kinesis Records; prevent double-validation of dynamic processors (after NIFI-8266)
Set allowableValues for ConsumeKinesisStream#REPORT_CLOUDWATCH_METRICS property
…deprecated properties; improve provenance reporting)
…fields to avoid storing them and instead pull them from teh Context as needed; additional logging on KCL shutdown)
Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChrisSamo632 I found an error case that should be handled in this round in my opinion. If the configured Kinesis Stream does not exist (which is quite a typical user error), then it cannot be seen on the UI. The Kinesis Worker logs error messages but the processor looks fine. After a while, the worker stops but still nothing on the UI.

As far as I see, there is no straightforward way to handle worker errors (eg. pass exception handler in to the worker). WorkerStateChangeListener may be used to monitor worker state changes and log error on the processor when the worker stopped for some reason.

@ChrisSamo632
Copy link
Contributor Author

@turcsanyip I've implemented a check in onTrigger to ensure the WorkerState never reaches SHUT_DOWN without the ConsumeKinesisStream processor being Stopped in NiFi - this is the only way I can see of trying to check whether the Worker has failed to initialise (it would remain at CREATED or INITIALIZING state until finally shutting down due to a configuration error).

Unit test added to show the behaviour (happily the unit tests don't connect to a Kinesis/DynamoDB instance so the Worker always fails eventually, now we throw a ProcessException to note that in the NiFi UI and allow a Bulletin to be generated).

Newer KCL versions allow for better handling of this scenario, but we're limited to when we can do in KCL 1.13.3 (see NIFI-8531 for upgrading to KCL 2.x and a note about improving this scenario).

…t Library Worker transitions to a SHUT_DOWN state without the processor being shutdown first
…isStream with WorkerState has unexpectedly SHUT_DOWN
Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 LGTM
Merging to main.

@udaykale Thanks for the initial commit / PR.
@ChrisSamo632 Thanks for the further improvements and getting it across the finish line.

@asfgit asfgit closed this in a274c12 May 12, 2021
@anekar3416
Copy link

Just wondering, which version of KCL has this fix or do we still need to use KCL 1.13.3?

@ChrisSamo632
Copy link
Contributor Author

Just wondering, which version of KCL has this fix or do we still need to use KCL 1.13.3?

@anekar3416 the KCL version was upgraded as part of NIFI-9520 in PR #5632, released in NiFi 1.16.0. The version of KCL currently used (after the linked ticket & PR) is 1.14.7 - upgrade to KCL 2.x requires larger changes to NiFi first in the future, e.g. NIFI-8287

krisztina-zsihovszki pushed a commit to krisztina-zsihovszki/nifi that referenced this pull request Jun 28, 2022
This closes apache#4822.

Co-authored-by: uday <udaygkale@gmail.com>

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
@ChrisSamo632 ChrisSamo632 deleted the NIFI-2892 branch October 21, 2022 18:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants