New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2467] Kinesis source watermark based on approximateArrivalTimestamp #3851
[BEAM-2467] Kinesis source watermark based on approximateArrivalTimestamp #3851
Conversation
@@ -92,6 +98,7 @@ public boolean advance() throws IOException { | |||
} catch (TransientKinesisException e) { | |||
LOG.warn("Transient exception occurred", e); | |||
} | |||
watermark = Instant.now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pawel-kaczmarczyk - I don't understand why do we do that. Let's say we have situation like this:
- we're really lagging behind on reading Kinesis stream, i.e. unread records have approximateArrivalTimestamp much less than Instant.now()
- we're trying to read next record, but there are some network problems and TransientKinesisException is thrown
- we advance watermark to Instant.now() although we probably shouldn't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, it should be rather done inside the try-catch block just after the loop. Will fix that
Instant approximateArrivalTimestamp = currentRecord.get() | ||
.getApproximateArrivalTimestamp(); | ||
if (approximateArrivalTimestamp.isAfter(watermark)) { | ||
watermark = approximateArrivalTimestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't work nicely with a scheme where we read data from shards in round robin way, because you may advance watermark based on a first shared although you've got older data in other shards.
What about changing this scheme to actually read the shard where records are most outdated? This will improve the whole solution greatly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's not ideal but I would treat it as a temporary solution. I'm planning to change the way the streams are read in BEAM-2468
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh interesting, I didn't realize that the number of shards is fixed. Then at some point this connector can be considerably simplified using Splittable DoFn, by decomposing it into two DoFn's - one (non-splittable) listing the shards, and the other (splittable) reading a single shard. See https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of shards is not fixed actually but the stream resharding is not supported currently by this connector. I'm planning to fix that in https://issues.apache.org/jira/browse/BEAM-2469.
I've heard about this Splittable DoFns idea some time ago but didn't delve into details yet. Sounds really interesting!
return response; | ||
} | ||
|
||
private boolean isEmptyResponseBeforeStreamEnd(GetKinesisRecordsResult response) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this name doesn't tell me much. Maybe something like:
notEnoughRecordsPulled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current name also emphasises that we still haven't catch up with the stream but somehow we got the empty response (it can really happen). I would rather stay with current name. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So maybe simply: gotEmptyResponseButIsBeforeEndOfTheStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I like that
try { | ||
response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(), | ||
checkpoint.getShardId()); | ||
shardIterator = response.getNextShardIterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you get new shard iterator each time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShardIterator has to be advanced after successful response in order to iterate over a shard. Isn't that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, my bad. Please ignore my comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Instant approximateArrivalTimestamp = currentRecord.get() | ||
.getApproximateArrivalTimestamp(); | ||
if (approximateArrivalTimestamp.isAfter(watermark)) { | ||
watermark = approximateArrivalTimestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh interesting, I didn't realize that the number of shards is fixed. Then at some point this connector can be considerably simplified using Splittable DoFn, by decomposing it into two DoFn's - one (non-splittable) listing the shards, and the other (splittable) reading a single shard. See https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html .
shardIterator = response.getNextShardIterator(); | ||
data.addAll(filter.apply(response.getRecords(), checkpoint)); | ||
} | ||
} while (response == null || gotEmptyResponseButIsBeforeEndOfTheStream(response)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the control flow here looks a bit convoluted. Can you refactor this into a "while(true)" loop with explicit breaks on exit conditions? Currently I'm confused by seeing "while (response == null..." whereas response is being used at line 91.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I really prefer to not use breaks in the code so that the loop is controlled in one place only. This loop is also kind of a temporary solution (or more of a workaround) for Kinesis behaviour. Kinesis can return 0 records during iteration before reaching the end of the stream while there may be some more events waiting. Current KinesisReader.advance() logic would treat that case as reaching the end of the stream and move the watermark to current timestamp.
I'm planning to implement Kinesis reading in the background threads so that's only a temporary loop that won't be needed anymore then.
} | ||
|
||
private boolean gotEmptyResponseButIsBeforeEndOfTheStream(GetKinesisRecordsResult response) { | ||
return response.getRecords().isEmpty() && response.getMillisBehindLatest() > 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm is this new exit condition related to watermark handling, or is it a bugfix? I'm not sure I understand the reason for the difference between old and new behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually answered that in my previous comment so just in short: Kinesis can return 0 records before reaching the end of the stream and that would cause problems with current watermark handling.
@@ -84,11 +85,17 @@ public boolean advance() throws IOException { | |||
for (int i = 0; i < shardIterators.size(); ++i) { | |||
currentRecord = shardIterators.getCurrent().next(); | |||
if (currentRecord.isPresent()) { | |||
Instant approximateArrivalTimestamp = currentRecord.get() | |||
.getApproximateArrivalTimestamp(); | |||
if (approximateArrivalTimestamp.isAfter(watermark)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we declare that watermark is the latest arrival timestamp seen so far - this seems like a very aggressive strategy for advancing the watermark, because it means that if records are even slightly out of order, they will be declared late data. PubsubIO uses a less aggressive strategy: https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L88 - would it be a good idea to use something similar here? (if you believe the current PR is already a vast improvement over the state of the code before this PR, then I'm fine getting it in as-is, but it's something worth thinking about).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like a really good idea to implement it in a similar way. I'll prepare the changes shortly.
…tamp - less aggressive strategy for advancing watermark
@jkff I've just pushed new changes, could you please review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Mostly ready to merge (I can do final fixups myself), but can you confirm whether you've tried running this against a real Kinesis and got reasonable autoscaling behavior?
throws IOException, TransientKinesisException { | ||
final long timestampMs = 1000L; | ||
|
||
KinesisRecord firstRecord = prepareRecordMockWithArrivalTimestamp(timestampMs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like lines 135-142 can be abstracted into a nice helper method:
prepareRecordsWithArrivalTimestamps(timestampMs, 1 /* increment /, 5 / count */);
and it can be reused in the 2 other tests
firstIteratorStubbing.thenReturn(CustomOptional.<KinesisRecord>absent()); | ||
when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent()); | ||
|
||
reader.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using the common pattern:
for (boolean more = reader.start(); more; more = reader.advance())
…tamp - test improvements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM, merging.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue.mvn clean verify
to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.