Skip to content

[BEAM-2455] Backlog size retrieval for Kinesis source#3551

Closed
pawel-kaczmarczyk wants to merge 2 commits intoapache:masterfrom
ocadotechnology:kinesis_backlog_size
Closed

[BEAM-2455] Backlog size retrieval for Kinesis source#3551
pawel-kaczmarczyk wants to merge 2 commits intoapache:masterfrom
ocadotechnology:kinesis_backlog_size

Conversation

@pawel-kaczmarczyk
Copy link
Contributor

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify.
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

@pawel-kaczmarczyk
Copy link
Contributor Author

@jbonofre @davorbonaci @jkff, can you please take a look?

@jbonofre
Copy link
Member

R: @jbonofre

@coveralls
Copy link

Coverage Status

Coverage increased (+0.01%) to 70.691% when pulling 84974b9 on ocadotechnology:kinesis_backlog_size into 91c7d3d on apache:master.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Leaving the "meat" of the review to @jbonofre, just a note on backward compatibility.

.setStreamName(streamName)
.setInitialPosition(
new StartingPoint(checkNotNull(initialPosition, "initialPosition")))
public Read withStreamName(String streamName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a backwards-incompatible change to a public API: it will cause user code to stop compiling. Beam has had an API-stable release, so we can no longer make incompatible changes: please revert this one, or make it backwards-compatible. Same applies to other renames and signature changes in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jkff. I thought there's still time to shape the API of Kinesis connector as it's still in early development stage. It also has the @experimental(Experimental.Kind.SOURCE_SINK) annotation which states that there might be some incompatible changes in the API.

So in case we want to keep it backward-compatible I would suggest marking old "from" methods as deprecated and keeping the newly added "with" methods (as per your suggestion from https://issues.apache.org/jira/browse/BEAM-1428)

There's was also a rename of KinesisClientProvider interface to AWSClientsProvider. This one fortunately shouldn't break anything as KinesisClientProvider was not public (but it should be as it was fairly pointed out in #3540). So the interface along with corresponding builder method "withAWSClientsProvider" can be safely renamed.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, I didn't notice it was marked Experimental. I guess then it's fine to proceed with the rest of the changes in this PR; I defer for further review to JB.

@pawel-kaczmarczyk
Copy link
Contributor Author

Hi @jbonofre, did you have a chance to take a look at this PR?

@jbonofre
Copy link
Member

On it pretty soon.

@pawel-kaczmarczyk
Copy link
Contributor Author

Hi @jbonofre, it's just a reminder ;)

@reuvenlax
Copy link
Contributor

R: @rangadi can you review this?

* When this limit is exceeded the runner might decide to scale the amount of resources
* allocated to the pipeline in order to speed up ingestion.
*/
public Read withAllowedRecordLateness(Duration allowedRecordLateness) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AllowedLateness in Beam refers to event time (i.e. how late a records event time could be compared to current watermark). Here it looks like you want to let user to provide some buffer before the source starts counting the backlog. Could you rename it?
Looks like the current implementation essentially disables backlog if this is > 0, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I wrote in general comment, this parameter should disable scaling up when events are late not more than allowedRecordLateness. So maybe I will rename it to "upToDateThreshold" to not confuse it with AllowedLateness concept in Beam. What do you think?

You are right that current implementation will actually never check the backlog size due to the way that reader's watermark is implemented. I'm planning to fix this in https://issues.apache.org/jira/browse/BEAM-2467. Maybe I should start the work with that other ticket but I noticed the dependency after submitting this PR. I've also reported 2 other improvements for this Kinesis source that will be submitted once these 2 are ready.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a different name would be better. threshold suffix seems appropriate.

The order of fixes could be either. Your convenience. If this is going in first, please add a TODO comment here that the feature does not work until water mark is implemented as in the jira.

}
long backlogBytes = lastBacklogBytes;
try {
backlogBytes = kinesis.getBacklogBytes(source.getStreamName(), watermark);
Copy link
Contributor

@rangadi rangadi Aug 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method might be called quite often. Since the stats are updated only once very minute, you could cache the values for a few seconds. Since getWatermark() returns now, does getBacklogBytes ever return non-zero?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getWatermark() method must be fixed in order to enable backlog size counting, as I mentioned in my previous comment.
Do you think caching is essential now? Setting allowedRecordLateness parameter to some reasonable number should significantly reduce the number of calls here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think so, but you can postpone it with a comment noting about the performance. With backlogThreshold (lateness), this might be mute when the pipeline is upto date, but the extra latency penalty kicks in when the pipeline is already has high backlog, which can exacerbate the problem that caused the delay in the first place.
Caching is requires just storing last backlog and request time in local transient state of the reader.

Copy link
Contributor

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for PR. Could you add some description of what this does and any relevant design decisions? The jira linked does not have much info either.

@pawel-kaczmarczyk
Copy link
Contributor Author

Thanks for your review @rangadi.

The only purpose of this backlog size counting is to enable autoscaling. Kinesis API does not provide any information about the remaining events in the stream so I had to use the CloudWatch service in order to get that info. I've also introduced the allowedRecordLateness parameter in order to limit the autoscaling when events are not that much late. It helped in our case when we observed workers count fluctuations due to temporary events bursts.

@rangadi
Copy link
Contributor

rangadi commented Aug 16, 2017

Replied inline to comments in the code. I do see why you have 'threshold', my comment was mainly about the 'lateness' in the name.

@pawel-kaczmarczyk
Copy link
Contributor Author

Ok, I've just applied suggested changes. Can you please take a look?

Copy link
Contributor

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@pawel-kaczmarczyk
Copy link
Contributor Author

Great, thanks @rangadi! So who can now merge this? @jbonofre?

@rangadi
Copy link
Contributor

rangadi commented Aug 18, 2017

Hi @pawel-kaczmarczyk, thanks for the updates. Could you ask someone familiar with KinesisIO to +1 overall solution? I don't know much about Kinesis.

@pawel-kaczmarczyk
Copy link
Contributor Author

@dhalperi @jkff Could you please review this PR? I'm not sure who else is familiar with this connector.

* speed up ingestion.
*
* <p>
* TODO: This feature will not work properly with current {@link KinesisReader#getWatermark()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this comment. It seems to say that the feature is currently working incorrectly - in that case, I'd suggest to remove the feature and re-add it when it's possible to make it work correctly, because there's no benefit to the user in having the API expose a feature that doesn't work. However, you say above that you've been running this and it was working for you. Can you clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jkff for your review. This feature works for us in our custom Kinesis connector implementation for Dataflow SDK where we have implemented some features required for production usage. Now I'm trying to port these features to Beam SDK (BEAM-2455, BEAM-2467, BEAM-2468, BEAM-2469). We already have all the required code but it just seemed a better option to split the changes into few pull requests for clarity. Now I see that I should submit BEAM-2467 before this one but as I explained to Raghu, I noticed the dependency after submitting this PR.

So if it's not a problem I would leave this feature with TODO comment that will be fixed in BEAM-2467.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. If the current PR does not work at all until BEAM-2467 is implemented, then let us revisit this PR after it is implemented: I don't see value to a user in committing this PR before then. Is there currently a PR in review for BEAM-2467? I couldn't find one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkff I just submitted a PR for BEAM-2467 here: #3851
So let's try to close that one quickly and I'll get back to this

if (backlogBytesLastCheckTime.plus(backlogBytesCheckThreshold).isAfterNow()) {
return lastBacklogBytes;
}
long backlogBytes = lastBacklogBytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why you need this: why not just do lastBacklogBytes = kinesis.getBacklogBytes at line 178, and get rid of the backlogBytes variable and line 185?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just seemed more clear probably but it's not needed. I'll apply your suggestion

@jkff
Copy link
Contributor

jkff commented Aug 24, 2017

Also - @rtshadow wrote the original Kinesis connector; maybe he can help with this review too?

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.7%) to 69.972% when pulling dd328a3 on ocadotechnology:kinesis_backlog_size into 91c7d3d on apache:master.

public long getTotalBacklogBytes() {
Instant watermark = getWatermark();
if (watermark.plus(upToDateThreshold).isAfterNow()) {
return 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pawel-kaczmarczyk I don't understand why cannot we simply return the actual number of backlog bytes? It should be up to the execution service to decide whether that number is high enough that scaling should occur

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simply return the backlog bytes and it works well in most cases. However in our case we observed unnecessary scaling during short spikes. Also there were issues when there were not so many events in the stream backlog but Kinesis was slow to return them. In such situation events are lagging behind only for a minute or two but runner may still decide to scale up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.
Can you please write that explanation in the comment in the code? Otherwise it's hard to understand why it is here.
Also, I'm not big fan of the "upToDataThreshold", but can't really come up with anything much better

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like something that should be fixed in the Dataflow Runner's streaming autoscaling logic (which, conveniently, is @rangadi's realm :) ), rather than in KinesisIO - though I'm ok keeping it as a temporary workaround while Dataflow Runner is being fixed. Could you file a JIRA explaining the behavior you observed from Dataflow runner's autoscaling, and link it from here, with a TODO to remove the knob?

Also @rangadi can you confirm that according to your understanding this knob is in fact conceptually a good workaround for the poor behavior of streaming autoscaling in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort of yes. In Dataflow autoscaling, some of the important parameters are not tunable. E.g. it upscales if the estimated time for remaining work is longer than 15 seconds. Not all pipelines might be this sensitive.. also in this case the actual backlog is reported by another aggregator of metrics which seems to update for every minute.

This is a reasonable work around. I'm not a fan of the name of the method either, but don't have much better suggestion.

if (watermark.plus(upToDateThreshold).isAfterNow()) {
return 0L;
}
if (backlogBytesLastCheckTime.plus(backlogBytesCheckThreshold).isAfterNow()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this method called so often that we need to cache the response from CloudWatch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how often this gets called in other runners but it wasn't a problem in case of Dataflow. This was suggested by @rangadi as a precaution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another important aspect is that comment implied this backlog is updated on Kenesis only at minute granularity. Fetching more often does not have any advantage.

Dataflow calls getWatermark() method after every "bundle" which could take only a few milliseconds or at most on the order of second. Assuming fetching backlog adds latency due to RPC to external system, it could severely affect performance, without providing any benefit.

public long getTotalBacklogBytes() {
Instant watermark = getWatermark();
if (watermark.plus(upToDateThreshold).isAfterNow()) {
return 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.
Can you please write that explanation in the comment in the code? Otherwise it's hard to understand why it is here.
Also, I'm not big fan of the "upToDataThreshold", but can't really come up with anything much better

@pawel-kaczmarczyk
Copy link
Contributor Author

Since #3851 is already merged, we can proceed with this one. I've rebased this PR on top of master and removed TODO comment. @jkff could you please take a look?

@coveralls
Copy link

Coverage Status

Coverage increased (+0.01%) to 69.538% when pulling e1d1959 on ocadotechnology:kinesis_backlog_size into 352f106 on apache:master.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, merging.

@asfgit asfgit closed this in 7db0f13 Sep 30, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants