-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18020][Streaming][Kinesis] Checkpoint SHARD_END to finish reading closed shards #16213
Conversation
…ction, this is workaround
Test build #69859 has finished for PR 16213 at commit
|
I'm looking into the failure. |
Test build #69860 has finished for PR 16213 at commit
|
Test build #69863 has finished for PR 16213 at commit
|
Jenkins, retest this please. |
Test build #69902 has finished for PR 16213 at commit
|
*/ | ||
public class CheckpointerShim { | ||
|
||
public static void shutdown(IRecordProcessorCheckpointer checkpointer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tried this call? https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java#L129
It seems you may add a finalize
method to KinesisCheckpointer
where you call the IRecordProcessorCheckpointer.checkpoint(ExtendedSequenceNumber.SHARD_END.sequenceNumber, ExtendedSequenceNumber.SHARD_END.subSequenceNumber)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I tried by this version, but I got the same exception;
16/12/10 05:49:00 ERROR ShutdownTask: Application exception.
java.lang.IllegalArgumentException: Sequence number must be numeric, but was SHARD_END
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator.validateSequenceNumber(SequenceNumberValidator.java:75)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:120)
at org.apache.spark.streaming.kinesis.KinesisCheckpointer.removeCheckpointer(KinesisCheckpointer.scala:77)
at org.apache.spark.streaming.kinesis.KinesisReceiver.removeCheckpointer(KinesisReceiver.scala:258)
IIUC this is a kind of aws library bugs. I asked in the kinesis forum here, but I didn't get an answer.
"\nData received does not match data sent after splitting a shard") | ||
} | ||
|
||
val (shardToMerge, adjShared) = splitOpenShards match { case Seq(e1, e2) => (e1, e2) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleaner to do
val Seq(shardToMerge, adjShared) = splitOpenShards
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably you also meant adjShard
. I would use adjacentShard
. For a moment I was thinking adjective
, shared adjective, what does that mean :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I tried to use adjacentShard
, but it returned null
. I didn't why it behaved like this though, do you know that?
Left a couple comments. I don't like the idea of using KCL internal methods. Hopefully we won't need to. |
Thanks for these comments! ya, I do not like this approach, too. But, since those who reshard streams always hit this issue and resharding is important for load-balancing in Kinesis streams (recently, a new API |
Test build #69956 has finished for PR 16213 at commit
|
Test build #71260 has finished for PR 16213 at commit
|
Test build #71263 has finished for PR 16213 at commit
|
Test build #71268 has finished for PR 16213 at commit
|
@brkyvz I asked AWS guys and I found correct handling in this case (See: https://forums.aws.amazon.com/thread.jspa?threadID=244218). I fixed in this way and could you check this again? Thanks! |
@brkyvz ping |
Sorry, taking a look |
Many thanks! |
} | ||
ssc.start() | ||
|
||
val (testData1, testData2, testData3) = (1 to 10, 11 to 20, 21 to 30) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: instead of doing a tuple expansion, mind making these new lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay
"\nData received does not match data sent after splitting a shard") | ||
} | ||
|
||
val Seq(shardToMerge, adjShared) = splitOpenShards |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: adjShared
-> adjShard
?
This LGTM now! Thank you so much for checking with AWS and posting thread link as a comment. |
Test build #71555 has finished for PR 16213 at commit
|
@tdas ping |
1 similar comment
@tdas ping |
facing same issue randomly on prod, can I help in some way to push it ? |
// We must call `checkpoint()` with no parameter to finish reading shards. | ||
// See an URL below for details: | ||
// https://forums.aws.amazon.com/thread.jspa?threadID=244218 | ||
KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you run this if checkpointer != null
. The docs mention that the checkpointer
may be null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the latest fix satisfy your intention?
@maropu One last comment. Then I'll merge this! |
okay, I'll update soon! |
LGTM! Pending tests. |
Many thanks! Also, congrats, commiter |
Test build #72007 has finished for PR 16213 at commit
|
Thanks a lot! Merging to master! (May take a while, going to be my first merge!) |
great, haha! Many thanks! |
…ing closed shards ## What changes were proposed in this pull request? This pr is to fix an issue occurred when resharding Kinesis streams; the resharding makes the KCL throw an exception because Spark does not checkpoint `SHARD_END` when finishing reading closed shards in `KinesisRecordProcessor#shutdown`. This bug finally leads to stopping subscribing new split (or merged) shards. ## How was this patch tested? Added a test in `KinesisStreamSuite` to check if it works well when splitting/merging shards. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes apache#16213 from maropu/SPARK-18020.
…ing closed shards ## What changes were proposed in this pull request? This pr is to fix an issue occurred when resharding Kinesis streams; the resharding makes the KCL throw an exception because Spark does not checkpoint `SHARD_END` when finishing reading closed shards in `KinesisRecordProcessor#shutdown`. This bug finally leads to stopping subscribing new split (or merged) shards. ## How was this patch tested? Added a test in `KinesisStreamSuite` to check if it works well when splitting/merging shards. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes apache#16213 from maropu/SPARK-18020.
…ing closed shards ## What changes were proposed in this pull request? This pr is to fix an issue occurred when resharding Kinesis streams; the resharding makes the KCL throw an exception because Spark does not checkpoint `SHARD_END` when finishing reading closed shards in `KinesisRecordProcessor#shutdown`. This bug finally leads to stopping subscribing new split (or merged) shards. ## How was this patch tested? Added a test in `KinesisStreamSuite` to check if it works well when splitting/merging shards. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes apache#16213 from maropu/SPARK-18020.
What changes were proposed in this pull request?
This pr is to fix an issue occurred when resharding Kinesis streams; the resharding makes the KCL throw an exception because Spark does not checkpoint
SHARD_END
when finishing reading closed shards inKinesisRecordProcessor#shutdown
. This bug finally leads to stopping subscribing new split (or merged) shards.How was this patch tested?
Added a test in
KinesisStreamSuite
to check if it works well when splitting/merging shards.