Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10597] Propagate BigQuery streaming insert throttled time to Dataflow worker #12403

Merged
merged 1 commit into from
Aug 21, 2020

Conversation

robinyqiu
Copy link
Contributor

@robinyqiu robinyqiu commented Jul 29, 2020

r: @chamikaramj @ihji

FYI: this change is very similar to #8973

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@robinyqiu robinyqiu changed the title [BEAM-10597] Propagate BigQuery streaming insert throttled time to Dataflow worker in Java SDK [BEAM-10597] Propagate BigQuery streaming insert throttled time to Dataflow worker Jul 29, 2020
@aaltay aaltay requested a review from chamikaramj July 29, 2020 20:49
@robinyqiu
Copy link
Contributor Author

Run Python PreCommit

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

container.tryGetCounter(
MetricName.named(
BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE,
BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAME));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the same name as above ("cumulativeThrottlingSeconds") and move it to a constant (and also do ms to sec conversion when setting) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
It would be great if we could use the same constant for all three use cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can use seconds, but on the streaming side msec is needed. That's the reason why I kept msec.

For consistency, we can change all counters to use msec originally, and do msec to sec conversion here. WDYT?

Copy link
Contributor Author

@robinyqiu robinyqiu Aug 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chamikaramj @ihji ! I have made the change such that BQ, GCS, and Datastore all report throttled time in milliseconds at the beginning, and they now share the common counter name for consistency. The millisecond to second conversion is done only when later a throttled time in seconds is expected by worker side code. PTAL

&& THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName())) {
if ((THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace())
&& THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName()))
|| (BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE.equals(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why we needed to use a unique name for BQ but not for GCS or Datastore ?

Copy link
Contributor Author

@robinyqiu robinyqiu Aug 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. GCS and Datastore counters are only consumed by batch worker (the THROTTLING_MSECS_METRIC_NAME counter here is a separate counter; I am not sure what this is. Maybe all throttling metrics should go to this counter? @ihji I saw you have a JIRA about it, not sure if this what you want to do).

Here in the streaming case, precision is on millisecond (whereas GCS and DataStore only store seconds)

@@ -867,6 +872,7 @@ public void deleteDataset(String projectId, String datasetId)
}
try {
sleeper.sleep(nextBackoffMillis);
throttlingMilliSeconds.inc(nextBackoffMillis);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for failures. Probably you need to increment the counter for backoff1 for rate limit errors above.

cc: @ihji

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retried failures here are transient failures, which I believe include throttling. I have thought about incrementing backoff1 but that is executed in a future (a parallel thread). If we accumulate counters over all threads then I think we will over calculate the number. So I add the counter here in the main thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throttling results in rate limit errors, right ? If so that would be captured by backoff1 I think. Prob. Heejong can confirm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My major concern on accumulating backoff1 is that we may over calculate, because we will be adding time being throttled on all threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should use backoff1. Rate limit errors only reach to this point after 2 minutes of backoffs by backoff1 silently inside of the future. Why do you think it's over-calculated? Each thread is doing its own insert job and it doesn't look strange to me to calculate the total throttling time by adding all backoff times from parallel threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should use backoff1. Rate limit errors only reach to this point after 2 minutes of backoffs by backoff1 silently inside of the future.

I see. That make sense to me.

Why do you think it's over-calculated?

Because I am not sure how this metrics is used downstream. I vaguely remember Dataflow autoscaling will use this number divided by the total time spent on work item to yield a fraction which signals throttling. If the total time is not accumulating time spent on all threads then we may over-calculate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that's a valid concern. We probably need to figure out the time requests are throttled without including backoff due to other errors. Is there a way to get throttled time from all parallel threads and just use the maximum ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to get throttled time from all parallel threads and just use the maximum?

Yes I think this is the right thing to do. Made the change already. PTAL. WDYT, @ihji?

@aaltay
Copy link
Member

aaltay commented Aug 13, 2020

@robinyqiu - What is the next step on this PR?

@robinyqiu
Copy link
Contributor Author

@robinyqiu - What is the next step on this PR?

I thought I replied to the comments but I didn't actually send it out. Thanks for the ping.

@@ -424,6 +427,9 @@ public Job getJob(JobReference jobRef, Sleeper sleeper, BackOff backoff)
private final PipelineOptions options;
private final long maxRowsPerBatch;
private final long maxRowBatchSize;
// aggregate the total time spent in exponential backoff
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider adding this to Python as well (in a separate PR).

cc: @pabloem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do in a new PR.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks.

@chamikaramj
Copy link
Contributor

Retest this please

@robinyqiu
Copy link
Contributor Author

Run Java PreCommit

@robinyqiu
Copy link
Contributor Author

Thank you for the review! I factored out the Python side change and will move it to a followup PR, for easier testing and importing.

@robinyqiu
Copy link
Contributor Author

I will merge after the tests are green. Thank you both!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants