-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source Google Ads: handle page token expired exception #9812
Source Google Ads: handle page token expired exception #9812
Conversation
Codecov Report
@@ Coverage Diff @@
## master #9812 +/- ##
=========================================
Coverage ? 72.44%
=========================================
Files ? 5
Lines ? 323
Branches ? 0
=========================================
Hits ? 234
Misses ? 89
Partials ? 0 Continue to review full report at Codecov.
|
/test connector=connectors/source-google-ads
|
@@ -128,3 +130,66 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: | |||
] | |||
) | |||
return streams | |||
|
|||
def _read_incremental( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments about this override:
- please, everytime we override something fill the docstring explaining why the override is needed (especially private method)
- I believe we can and should do error handling on the stream level (or
GoogleAds
class), not here - The pattern looks like typical retry case. I don't think reducing page size is a good solution because we basically lose all progress. I propose to retry query with new state and continue reading. The data in response returned in ascending order, therefore should be no problem to continue reading where we crashed.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@augan-rymkhan here is an explanation of the 3rd point.
Lets say we request the data:
from 2010-01-01 00:00:00
to 2010-01-15 00:00:00
we read 10 pages, but there is another 20 pages left
we crashed at cursor_value
= 2010-01-05 13:00:12
we will retry query
from 2010-01-05 13:00:12
to 2010-01-15 00:00:00
.
if this will not work (I don't see why, but...), we can retry exactly the same query but continue from the page number where we crashed (10).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I am checking where it can be handled inside stream class. I'll update here.
stream_instance.stream_slices
receivesstream_state
, which is the latest state taken from get_updated_state and generate slices from that point. It doesn't lose the progress, it regenerates slices with reduces date range starting from the latest read record's cursor value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my comments
Currently this method returns `start_date` and `end_date` with 15 days difference. | ||
""" | ||
|
||
end_date = end_date or pendulum.yesterday(tz=time_zone) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why time_zone is used only for end_date? And why we need this parameter in general?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vitaliizazmic This line was there before this change, get_date_params
was stream method. I just refactored it to make it as a module function to be able to call it inside chunk_date_range
function.
end_date = end_date or pendulum.yesterday(tz=time_zone) | ||
start_date = pendulum.parse(start_date) | ||
if start_date > pendulum.now(): | ||
return start_date.to_date_string(), start_date.add(days=1).to_date_string() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain why this dates are returned in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vitaliizazmic This line is not changed by me, it's existing logic before this PR
https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py#L105
# Fix issue #4806, start date should always be lower than end date. | ||
if start_date.add(days=1).date() >= end_date.date(): | ||
return start_date.add(days=1).to_date_string(), start_date.add(days=2).to_date_string() | ||
return start_date.add(days=1).to_date_string(), end_date.to_date_string() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain why one day is added to start day?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here. It's existing logic.
https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py#L110
@@ -35,16 +64,21 @@ def chunk_date_range( | |||
|
|||
# As in to return some state when state in abnormal | |||
if start_date > end_date: | |||
return [{field: start_date.to_date_string()}] | |||
start, end = get_date_params(start_date.to_date_string(), time_zone=time_zone, range_days=range_days) | |||
return [{"start_date": start, "end_date": end}] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe, we can avoid duplicating code by setting dates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vitaliizazmic Done. I refactored this line. thanks for this suggestion!
for record in self.parse_response(response): | ||
state = self.get_updated_state(state, record) | ||
yield record | ||
except GoogleAdsException as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, we shouldn't use names like "e".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vitaliizazmic Changed var name.
start_date, end_date = parse_dates(stream_slice) | ||
if (end_date - start_date).days == 1: | ||
# If range days is 1, no need in retry, because it's the minimum date range | ||
raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it return some description of error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will be description: message: "Page token has expired."
l will add here extra log.
raise e | ||
else: | ||
# return the control if no exception is raised | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my view, it isn't good solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vitaliizazmic After reading records is completed successfully, it needs to quit the loop. What solution you suggest?
return start_date, end_date | ||
|
||
|
||
def get_date_params(start_date: str, time_zone=None, range_days: int = None, end_date: pendulum.datetime = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't have return type annotations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@augan-rymkhan Added return type.
try: | ||
response = self.google_ads_client.send_request(self.get_query(stream_slice)) | ||
for record in self.parse_response(response): | ||
state = self.get_updated_state(state, record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I understand why we need to call get_updated_state
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@keu This method is called to get the cursor value from the record. To be sure the cursor value is the latest state I call this method.
if start_date.add(days=1).date() >= end_date.date(): | ||
return start_date.add(days=1).to_date_string(), start_date.add(days=2).to_date_string() | ||
return start_date.add(days=1).to_date_string(), end_date.to_date_string() | ||
state = stream_state or {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the logic of this function has two parts:
- handling error
- reading records (duplicate logic of stream.read or super class)
can we implement this as a retry decorator? or at least move this to separate function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@keu Refactored this method, now it calls the parent's read_records
method. I am not sure that, using decorator will improve this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let it go, as it is oncall issue
/publish connector=connectors/source-google-ads
|
What
Resolves On-Call #103 Page token has expired.
Reducing twice data range (from 1 month to 15 days) did not help. If there are huge amount of data in that date range, processing can take more than 2 hours, then page token expires after that time.
How
Override
read_records
method in theIncrementalGoogleAdsStream
so that, it handleGoogleAdsException
withEXPIRED_PAGE_TOKEN
error code, and updatestart_date
key in thestream_slice
with the latest read record's cursor value, then retry the sync.The first attempt
The second attempt
If the connector couldn't read all the records within one day, it will enter an infinite loop, so stop the sync with error
Refactored
chunk_date_range
, now it returns stream slices in the following format:Recommended reading order
source-google-ads/source_google_ads/streams.py
source-google-ads/unit_tests/test_streams.py
unit_tests/test_google_ads.py
source-google-ads/unit_tests/test_source.py
source_google_ads/custom_query_stream.py