[BUG FIX]Kinesis lag keep increasing when there is no more new data for kinesis stream#11006
Conversation
|
Hi @clintropolis could you please take a look at this issue at your convince? I find you introduce this useful metrics in #9509. Maybe you will be more familiar with it :) |
| }); | ||
| } | ||
|
|
||
| private GetRecordsResult getRecords(String iteratorType, String offsetToUse, StreamPartition<String> partition) |
There was a problem hiding this comment.
nit: might be nice to name this method getRecordsForLag or something similar to indicate its limited purpose (the .withLimit(1) makes it probably not really useful for actually getting records, nor does it need to be because other methods are handling that)
There was a problem hiding this comment.
Changed. Thanks for your review.
clintropolis
left a comment
There was a problem hiding this comment.
i think this change makes sense, lgtm 👍
|
Hi @clintropolis Thanks for your review and approval! Job 75 |
|
@zhangyue19921010 have you been able to run the kinesis integration tests locally to see if those tests continue to pass after this change? There are some instructions on how to run them here - https://github.com/apache/druid/blob/master/integration-tests/README.md#running-a-test-that-uses-cloud |
|
Hi @suneet-s Thanks for reminding. I just run the kinesis integration tests locally and succeeded. |
|
Thanks for the contribution @zhangyue19921010 ! |
|
Thanks a lot :) @suneet-s and @clintropolis |

Fixes #11005.
Description
check the value of
AFTER_SEQUENCE_NUMBER. If empty it means there is no more new data for current stream. So just return 0L here.Key changed/added classes in this PR
KinesisRecordSupplier.javaThis PR has: