Skip to content

Commit

Permalink
🐛 Source Facebook Marketing: restore reduced request record limit aft…
Browse files Browse the repository at this point in the history
…er `successful` response (#27979)
  • Loading branch information
bazarnov committed Jul 7, 2023
1 parent 845b771 commit 9675cc1
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=1.0.1
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerImageTag: 1.0.0
dockerImageTag: 1.0.1
dockerRepository: airbyte/source-facebook-marketing
githubIssueLabel: source-facebook-marketing
icon: facebook.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ class MyFacebookAdsApi(FacebookAdsApi):
MAX_RATE, MAX_PAUSE_INTERVAL = (95, pendulum.duration(minutes=10))
MIN_RATE, MIN_PAUSE_INTERVAL = (85, pendulum.duration(minutes=2))

# see `_should_restore_page_size` method docstring for more info.
# attribute to handle the reduced request limit
request_record_limit_is_reduced: bool = False
# attribute to save the status of last successfull call
last_api_call_is_successful: bool = False

@dataclass
class Throttle:
"""Utilization of call rate in %, from 0 to 100"""
Expand Down Expand Up @@ -139,6 +145,14 @@ def _update_insights_throttle_limit(self, response: FacebookResponse):
per_account=ads_insights_throttle.get("acc_id_util_pct", 0),
)

def _should_restore_default_page_size(self, params):
"""
Track the state of the `request_record_limit_is_reduced` and `last_api_call_is_successfull`,
based on the logic from `@backoff_policy` (common.py > `reduce_request_record_limit` and `revert_request_record_limit`)
"""
params = True if params else False
return params and not self.request_record_limit_is_reduced and self.last_api_call_is_successful

@backoff_policy
def call(
self,
Expand All @@ -151,6 +165,8 @@ def call(
api_version=None,
):
"""Makes an API call, delegate actual work to parent class and handles call rates"""
if self._should_restore_default_page_size(params):
params.update(**{"limit": self.default_page_size})
response = super().call(method, path, params, headers, files, url_override, api_version)
self._update_insights_throttle_limit(response)
self._handle_call_rate_limit(response, params)
Expand All @@ -160,10 +176,14 @@ def call(
class API:
"""Simple wrapper around Facebook API"""

def __init__(self, account_id: str, access_token: str):
def __init__(self, account_id: str, access_token: str, page_size: int = 100):
self._account_id = account_id
# design flaw in MyFacebookAdsApi requires such strange set of new default api instance
self.api = MyFacebookAdsApi.init(access_token=access_token, crash_log=False)
# adding the default page size from config to the api base class
# reference issue: https://github.com/airbytehq/airbyte/issues/25383
setattr(self.api, "default_page_size", page_size)
# set the default API client to Facebook lib.
FacebookAdsApi.set_default_api(self.api)

@cached_property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
if config.end_date < config.start_date:
return False, "end_date must be equal or after start_date."

api = API(account_id=config.account_id, access_token=config.access_token)
api = API(account_id=config.account_id, access_token=config.access_token, page_size=config.page_size)
logger.info(f"Select account {api.account}")
except (requests.exceptions.RequestException, ValidationError, FacebookAPIException) as e:
return False, e
Expand All @@ -108,7 +108,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
config.start_date = validate_start_date(config.start_date)
config.end_date = validate_end_date(config.start_date, config.end_date)

api = API(account_id=config.account_id, access_token=config.access_token)
api = API(account_id=config.account_id, access_token=config.access_token, page_size=config.page_size)

insights_args = dict(
api=api, start_date=config.start_date, end_date=config.end_date, insights_lookback_window=config.insights_lookback_window
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,35 @@ def log_retry_attempt(details):

def reduce_request_record_limit(details):
_, exc, _ = sys.exc_info()
# the list of error patterns to track,
# in order to reduce the requestt page size and retry
error_patterns = [
"Please reduce the amount of data you're asking for, then retry your request",
"An unknown error occurred",
]
if (
details.get("kwargs", {}).get("params", {}).get("limit")
and exc.http_status() == http.client.INTERNAL_SERVER_ERROR
and exc.api_error_message() == "Please reduce the amount of data you're asking for, then retry your request"
and exc.api_error_message() in error_patterns
):
# reduce the existing request `limit` param by a half and retry
details["kwargs"]["params"]["limit"] = int(int(details["kwargs"]["params"]["limit"]) / 2)
# set the flag to the api class that the last api call failed
details.get("args")[0].last_api_call_is_successfull = False
# set the flag to the api class that the `limit` param was reduced
details.get("args")[0].request_record_limit_is_reduced = True

def revert_request_record_limit(details):
"""
This method is triggered `on_success` after successfull retry,
sets the internal class flags to provide the logic to restore the previously reduced
`limit` param.
"""
# reference issue: https://github.com/airbytehq/airbyte/issues/25383
# set the flag to the api class that the last api call was ssuccessfull
details.get("args")[0].last_api_call_is_successfull = True
# set the flag to the api class that the `limit` param is restored
details.get("args")[0].request_record_limit_is_reduced = False

def should_retry_api_error(exc):
if isinstance(exc, FacebookRequestError):
Expand Down Expand Up @@ -68,6 +91,7 @@ def should_retry_api_error(exc):
exception,
jitter=None,
on_backoff=[log_retry_attempt, reduce_request_record_limit],
on_success=[revert_request_record_limit],
giveup=lambda exc: not should_retry_api_error(exc),
**wait_gen_kwargs,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def fb_account_response_fixture(account_id):

@fixture(name="api")
def api_fixture(some_config, requests_mock, fb_account_response):
api = API(account_id=some_config["account_id"], access_token=some_config["access_token"])
api = API(account_id=some_config["account_id"], access_token=some_config["access_token"], page_size=100)

requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/me/adaccounts", [fb_account_response])
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{some_config['account_id']}/", [fb_account_response])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from airbyte_cdk.models import SyncMode
from facebook_business import FacebookAdsApi, FacebookSession
from facebook_business.exceptions import FacebookRequestError
from source_facebook_marketing.streams import AdAccount, AdCreatives, Campaigns, Videos
from source_facebook_marketing.streams import Activities, AdAccount, AdCreatives, Campaigns, Videos

FB_API_VERSION = FacebookAdsApi.API_VERSION

Expand Down Expand Up @@ -158,6 +158,43 @@ def test_limit_error_retry(self, fb_call_amount_data_response, requests_mock, ap
except FacebookRequestError:
assert [x.qs.get("limit")[0] for x in res.request_history] == ["100", "50", "25", "12", "6"]

def test_limit_error_retry_revert_page_size(self, requests_mock, api, account_id):
"""Error every time, check limit parameter decreases by 2 times every new call"""

error = {
"json": {
"error": {
"message": "An unknown error occurred",
"code": 1,
}
},
"status_code": 500,
}
success = {
"json": {
'data': [],
"paging": {
"cursors": {
"after": "test",
},
"next": f"https://graph.facebook.com/{FB_API_VERSION}/act_{account_id}/activities?limit=31&after=test"
}
},
"status_code": 200,
}

res = requests_mock.register_uri(
"GET",
FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/activities",
[error, success, error, success],
)

stream = Activities(api=api, start_date=pendulum.now(), end_date=pendulum.now(), include_deleted=False, page_size=100)
try:
list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_state={}))
except FacebookRequestError:
assert [x.qs.get("limit")[0] for x in res.request_history] == ['100', '50', '100', '50']

def test_limit_error_retry_next_page(self, fb_call_amount_data_response, requests_mock, api, account_id):
"""Unlike the previous test, this one tests the API call fail on the second or more page of a request."""
base_url = FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/advideos"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_check_connection_ok(self, api, config, logger_mock):

assert ok
assert not error_msg
api.assert_called_once_with(account_id="123", access_token="TOKEN")
api.assert_called_once_with(account_id="123", access_token="TOKEN", page_size=100)
logger_mock.info.assert_called_once_with(f"Select account {api.return_value.account}")

def test_check_connection_future_date_range(self, api, config, logger_mock):
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ Please be informed that the connector uses the `lookback_window` parameter to pe

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| 1.0.1 | 2023-07-07 | [27979](https://github.com/airbytehq/airbyte/pull/27979) | Added the ability to restore the reduced request record limit after the successful retry, and handle the `unknown error` (code 99) with the retry strategy |
| 1.0.0 | 2023-07-05 | [27563](https://github.com/airbytehq/airbyte/pull/27563) | Migrate to FB SDK version 17 |
| 0.5.0 | 2023-06-26 | [27728](https://github.com/airbytehq/airbyte/pull/27728) | License Update: Elv2 |
| 0.4.3 | 2023-05-12 | [27483](https://github.com/airbytehq/airbyte/pull/27483) | Reduce replication start date by one more day |
Expand Down

0 comments on commit 9675cc1

Please sign in to comment.