From 9675cc12a2faf8041fa75bf5734dec3cdc430759 Mon Sep 17 00:00:00 2001 From: Baz Date: Fri, 7 Jul 2023 12:41:57 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20Facebook=20Marketing:?= =?UTF-8?q?=20restore=20reduced=20request=20record=20limit=20after=20`succ?= =?UTF-8?q?essful`=20response=20=20(#27979)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../source-facebook-marketing/Dockerfile | 2 +- .../source-facebook-marketing/metadata.yaml | 2 +- .../source_facebook_marketing/api.py | 22 ++++++++++- .../source_facebook_marketing/source.py | 4 +- .../streams/common.py | 26 ++++++++++++- .../unit_tests/conftest.py | 2 +- .../unit_tests/test_client.py | 39 ++++++++++++++++++- .../unit_tests/test_source.py | 2 +- .../sources/facebook-marketing.md | 1 + 9 files changed, 91 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 25059beb35a857..e61cc18335b3b8 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=1.0.0 +LABEL io.airbyte.version=1.0.1 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml b/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml index 820e497ceb1903..d2795b64131da1 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml +++ b/airbyte-integrations/connectors/source-facebook-marketing/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: api connectorType: source definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c - dockerImageTag: 1.0.0 + dockerImageTag: 1.0.1 dockerRepository: airbyte/source-facebook-marketing githubIssueLabel: source-facebook-marketing icon: facebook.svg diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py index 8897ace2b60e13..92a85fb7dd8b41 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py @@ -34,6 +34,12 @@ class MyFacebookAdsApi(FacebookAdsApi): MAX_RATE, MAX_PAUSE_INTERVAL = (95, pendulum.duration(minutes=10)) MIN_RATE, MIN_PAUSE_INTERVAL = (85, pendulum.duration(minutes=2)) + # see `_should_restore_page_size` method docstring for more info. + # attribute to handle the reduced request limit + request_record_limit_is_reduced: bool = False + # attribute to save the status of last successfull call + last_api_call_is_successful: bool = False + @dataclass class Throttle: """Utilization of call rate in %, from 0 to 100""" @@ -139,6 +145,14 @@ def _update_insights_throttle_limit(self, response: FacebookResponse): per_account=ads_insights_throttle.get("acc_id_util_pct", 0), ) + def _should_restore_default_page_size(self, params): + """ + Track the state of the `request_record_limit_is_reduced` and `last_api_call_is_successfull`, + based on the logic from `@backoff_policy` (common.py > `reduce_request_record_limit` and `revert_request_record_limit`) + """ + params = True if params else False + return params and not self.request_record_limit_is_reduced and self.last_api_call_is_successful + @backoff_policy def call( self, @@ -151,6 +165,8 @@ def call( api_version=None, ): """Makes an API call, delegate actual work to parent class and handles call rates""" + if self._should_restore_default_page_size(params): + params.update(**{"limit": self.default_page_size}) response = super().call(method, path, params, headers, files, url_override, api_version) self._update_insights_throttle_limit(response) self._handle_call_rate_limit(response, params) @@ -160,10 +176,14 @@ def call( class API: """Simple wrapper around Facebook API""" - def __init__(self, account_id: str, access_token: str): + def __init__(self, account_id: str, access_token: str, page_size: int = 100): self._account_id = account_id # design flaw in MyFacebookAdsApi requires such strange set of new default api instance self.api = MyFacebookAdsApi.init(access_token=access_token, crash_log=False) + # adding the default page size from config to the api base class + # reference issue: https://github.com/airbytehq/airbyte/issues/25383 + setattr(self.api, "default_page_size", page_size) + # set the default API client to Facebook lib. FacebookAdsApi.set_default_api(self.api) @cached_property diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index cfaa33b4e015f9..a4c679fecc9ac2 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -85,7 +85,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> if config.end_date < config.start_date: return False, "end_date must be equal or after start_date." - api = API(account_id=config.account_id, access_token=config.access_token) + api = API(account_id=config.account_id, access_token=config.access_token, page_size=config.page_size) logger.info(f"Select account {api.account}") except (requests.exceptions.RequestException, ValidationError, FacebookAPIException) as e: return False, e @@ -108,7 +108,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: config.start_date = validate_start_date(config.start_date) config.end_date = validate_end_date(config.start_date, config.end_date) - api = API(account_id=config.account_id, access_token=config.access_token) + api = API(account_id=config.account_id, access_token=config.access_token, page_size=config.page_size) insights_args = dict( api=api, start_date=config.start_date, end_date=config.end_date, insights_lookback_window=config.insights_lookback_window diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py index abe79bbf11df7b..49b2ff28d9850a 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py @@ -35,12 +35,35 @@ def log_retry_attempt(details): def reduce_request_record_limit(details): _, exc, _ = sys.exc_info() + # the list of error patterns to track, + # in order to reduce the requestt page size and retry + error_patterns = [ + "Please reduce the amount of data you're asking for, then retry your request", + "An unknown error occurred", + ] if ( details.get("kwargs", {}).get("params", {}).get("limit") and exc.http_status() == http.client.INTERNAL_SERVER_ERROR - and exc.api_error_message() == "Please reduce the amount of data you're asking for, then retry your request" + and exc.api_error_message() in error_patterns ): + # reduce the existing request `limit` param by a half and retry details["kwargs"]["params"]["limit"] = int(int(details["kwargs"]["params"]["limit"]) / 2) + # set the flag to the api class that the last api call failed + details.get("args")[0].last_api_call_is_successfull = False + # set the flag to the api class that the `limit` param was reduced + details.get("args")[0].request_record_limit_is_reduced = True + + def revert_request_record_limit(details): + """ + This method is triggered `on_success` after successfull retry, + sets the internal class flags to provide the logic to restore the previously reduced + `limit` param. + """ + # reference issue: https://github.com/airbytehq/airbyte/issues/25383 + # set the flag to the api class that the last api call was ssuccessfull + details.get("args")[0].last_api_call_is_successfull = True + # set the flag to the api class that the `limit` param is restored + details.get("args")[0].request_record_limit_is_reduced = False def should_retry_api_error(exc): if isinstance(exc, FacebookRequestError): @@ -68,6 +91,7 @@ def should_retry_api_error(exc): exception, jitter=None, on_backoff=[log_retry_attempt, reduce_request_record_limit], + on_success=[revert_request_record_limit], giveup=lambda exc: not should_retry_api_error(exc), **wait_gen_kwargs, ) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py index 01a4b402103ada..ad2454b02ea432 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py @@ -49,7 +49,7 @@ def fb_account_response_fixture(account_id): @fixture(name="api") def api_fixture(some_config, requests_mock, fb_account_response): - api = API(account_id=some_config["account_id"], access_token=some_config["access_token"]) + api = API(account_id=some_config["account_id"], access_token=some_config["access_token"], page_size=100) requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/me/adaccounts", [fb_account_response]) requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{some_config['account_id']}/", [fb_account_response]) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py index 74bcec4f315cfa..310546d5ec190d 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py @@ -9,7 +9,7 @@ from airbyte_cdk.models import SyncMode from facebook_business import FacebookAdsApi, FacebookSession from facebook_business.exceptions import FacebookRequestError -from source_facebook_marketing.streams import AdAccount, AdCreatives, Campaigns, Videos +from source_facebook_marketing.streams import Activities, AdAccount, AdCreatives, Campaigns, Videos FB_API_VERSION = FacebookAdsApi.API_VERSION @@ -158,6 +158,43 @@ def test_limit_error_retry(self, fb_call_amount_data_response, requests_mock, ap except FacebookRequestError: assert [x.qs.get("limit")[0] for x in res.request_history] == ["100", "50", "25", "12", "6"] + def test_limit_error_retry_revert_page_size(self, requests_mock, api, account_id): + """Error every time, check limit parameter decreases by 2 times every new call""" + + error = { + "json": { + "error": { + "message": "An unknown error occurred", + "code": 1, + } + }, + "status_code": 500, + } + success = { + "json": { + 'data': [], + "paging": { + "cursors": { + "after": "test", + }, + "next": f"https://graph.facebook.com/{FB_API_VERSION}/act_{account_id}/activities?limit=31&after=test" + } + }, + "status_code": 200, + } + + res = requests_mock.register_uri( + "GET", + FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/activities", + [error, success, error, success], + ) + + stream = Activities(api=api, start_date=pendulum.now(), end_date=pendulum.now(), include_deleted=False, page_size=100) + try: + list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_state={})) + except FacebookRequestError: + assert [x.qs.get("limit")[0] for x in res.request_history] == ['100', '50', '100', '50'] + def test_limit_error_retry_next_page(self, fb_call_amount_data_response, requests_mock, api, account_id): """Unlike the previous test, this one tests the API call fail on the second or more page of a request.""" base_url = FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/advideos" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py index 1b7e472f4f03a9..4fed3f200b727f 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py @@ -55,7 +55,7 @@ def test_check_connection_ok(self, api, config, logger_mock): assert ok assert not error_msg - api.assert_called_once_with(account_id="123", access_token="TOKEN") + api.assert_called_once_with(account_id="123", access_token="TOKEN", page_size=100) logger_mock.info.assert_called_once_with(f"Select account {api.return_value.account}") def test_check_connection_future_date_range(self, api, config, logger_mock): diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index e13394d32f14dc..163330a7ff0e37 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -167,6 +167,7 @@ Please be informed that the connector uses the `lookback_window` parameter to pe | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| 1.0.1 | 2023-07-07 | [27979](https://github.com/airbytehq/airbyte/pull/27979) | Added the ability to restore the reduced request record limit after the successful retry, and handle the `unknown error` (code 99) with the retry strategy | | 1.0.0 | 2023-07-05 | [27563](https://github.com/airbytehq/airbyte/pull/27563) | Migrate to FB SDK version 17 | | 0.5.0 | 2023-06-26 | [27728](https://github.com/airbytehq/airbyte/pull/27728) | License Update: Elv2 | | 0.4.3 | 2023-05-12 | [27483](https://github.com/airbytehq/airbyte/pull/27483) | Reduce replication start date by one more day |