-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add GoogleAdsIO for reading from Google Ads #27681
Conversation
Codecov Report
@@ Coverage Diff @@
## master #27681 +/- ##
=======================================
Coverage 70.59% 70.59%
=======================================
Files 857 857
Lines 103944 103947 +3
=======================================
+ Hits 73375 73378 +3
Misses 29004 29004
Partials 1565 1565
Flags with carried forward coverage won't be shown. Click here to find out more. see 11 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Assigning reviewers. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Run Java PreCommit |
Run Python PreCommit |
Linter failure
I believe |
To my surprise it actually didn't catch that, but I did a search and replace to remove trailing whitespace. |
* <pre> | ||
* --googleAdsClientId=your-client-id | ||
* --googleAdsClientSecret=your-client-secret | ||
* --googleAdsRefreshToken=your-refresh-token | ||
* --googleAdsDeveloperToken=your-developer-token | ||
* </pre> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should probably link https://developers.google.com/google-ads/api/docs/oauth/overview around here too instead of the options only (for more visibility)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
private static final int MAX_RETRIES = 5; | ||
private static final FluentBackoff BACKOFF = | ||
FluentBackoff.DEFAULT | ||
.withExponent(2.0) | ||
.withInitialBackoff(Duration.standardSeconds(30)) | ||
.withMaxRetries(MAX_RETRIES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit Are those arbitrary or recommended somewhere? If recommended, probably a reference would make sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The retry configuration is an adaption of the BQ DTS configuration. I've added a comment to reflect that these aren't entirely arbitrary.
sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
Show resolved
Hide resolved
@Override | ||
public PCollection<GoogleAdsRow> expand(PBegin input) { | ||
String query = getQuery(); | ||
List<Long> customerIds = getCustomerIds(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern is limiting. Because the customer IDs are specified at pipeline construction time, they cannot be dynamic.
Its likely worth reworking this IO to instead consume a PCollection of those IDs, so that they can be dynamically generated by some upstream source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed the transform to take a PCollection<String>
as input. Read
can now be used to issue a single query for many customer IDs and ReadAll
still provides the flexibility to alter the query per element (e.g. to support queries of the same output shape with differing WHERE clauses per customer ID).
Customer IDs are typically expressed as long
, including in the Google Ads client library example code, but the protobuf schema for API requests defines customer IDs as String
(oddly enough, linkedCustomerId/loginCustomerId are both long
in the client...). To avoid baking assumptions about the identifier's format into the IO I think it makes sense to use String
as the input type. The documentation example reflects this as well.
String customerId = request.getCustomerId(); | ||
|
||
do { | ||
rateLimitPolicy.onBeforeRequest(developerToken, customerId, request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the consequences for breaking the rate limit? I ask, because the default rate limiter won't scale with parallel reads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A whole lot of trouble and non-compliance fees, see https://developers.google.com/google-ads/api/docs/rate-sheet.
I've given this a bit more thought and I believe that the policy configuration needs to be owned entirely by the user without giving the illusion of safe defaults, since there simply are none that we can safely recommend due to the complexity of the APIs quotas and limits and their global applicability to developer tokens or customer IDs.
I will rename DefaultRateLimitPolicy
to FixedRateLimitPolicy
, mark it public, and leave rateLimitPolicyFactory
set to null on construction. I'll mark FixedRateLimitPolicy as clearly not intended for advanced users who may exceed their global quota across multiple running applications.
Advanced users should implement their own shared token bucket service (e.g. using https://github.com/mailgun/gubernator) as per the advice provided at https://developers.google.com/google-ads/api/docs/best-practices/rate-limits#throttle.
Moving DatastoreIO's AdaptiveThrottler to a util package would offer a helpful building block for users to apply an additional layer of client side protection against exceeding limits. I'm planning a follow up PR to do that.
Upon reviewing DatastoreIO and FirestoreIO it occurred to me that neither of them apply a pipeline global rate limit across all workers, but divide the desired pipeline global QPS by maxNumWorkers, with their default being 1 QPS local to the DoFn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, no policy is set by default but required nonetheless and DefaultRateLimitPolicy has been renamed to SimpleRateLimitPolicy and simply wraps the Guava RateLimiter.
Adequate warnings have been added to the documentation to inform users they should not rely on SimpleRateLimitPolicy for anything but low volume and development use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add those links to the javadoc, to make it easier for users to implement their own rate limiter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
4324b22
to
5e7ddfd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks Steven!
Run Java PreCommit |
Run SQL_Java11 PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Lets add that additional documentation, and then I think this is good to go
d2c0ac0
to
04282b9
Compare
Run Java PreCommit |
Run SQL_Java17 PreCommit |
Adds a new IO connector to read from Google Ads.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.