diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json index 90cdf6e8efe2a9..c0355b7ce414c0 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c", "name": "Facebook Marketing", "dockerRepository": "airbyte/source-facebook-marketing", - "dockerImageTag": "0.2.5", + "dockerImageTag": "0.2.6", "documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing", "icon": "facebook.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index c4f73ce73b1cef..30f36b167a1d68 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -96,7 +96,7 @@ - sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c name: Facebook Marketing dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.5 + dockerImageTag: 0.2.6 documentationUrl: https://hub.docker.com/r/airbyte/source-facebook-marketing icon: facebook.svg - sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c diff --git a/airbyte-integrations/connectors/source-facebook-marketing/CHANGELOG.md b/airbyte-integrations/connectors/source-facebook-marketing/CHANGELOG.md index fae3029ad94e63..9a3198ce3735d8 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/CHANGELOG.md +++ b/airbyte-integrations/connectors/source-facebook-marketing/CHANGELOG.md @@ -3,3 +3,8 @@ ## 0.2.4 Fix an issue that caused losing Insights data from the past 28 days while incremental sync: https://github.com/airbytehq/airbyte/pull/3395 +## 0.2.5 +Allow configuring insights lookback window (#3396) + +## 0.2.6 +Fix handling call rate limit (#3525) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index c64b07e25d8d71..c7b37131b4ed94 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -1,16 +1,15 @@ -FROM airbyte/integration-base-python:0.1.1 +FROM python:3.7-slim # Bash is installed for more convenient debugging. RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* -ENV CODE_PATH="source_facebook_marketing" -ENV AIRBYTE_IMPL_MODULE="source_facebook_marketing" -ENV AIRBYTE_IMPL_PATH="SourceFacebookMarketing" - WORKDIR /airbyte/integration_code -COPY $CODE_PATH ./$CODE_PATH +COPY source_facebook_marketing ./source_facebook_marketing +COPY main.py ./ COPY setup.py ./ RUN pip install . -LABEL io.airbyte.version=0.2.5 +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.2.6 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/airbyte-integrations/connectors/source-facebook-marketing/README.md b/airbyte-integrations/connectors/source-facebook-marketing/README.md index 5b5c4be8e5852e..b79e849a96a32b 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/README.md +++ b/airbyte-integrations/connectors/source-facebook-marketing/README.md @@ -45,10 +45,10 @@ and place them into `secrets/config.json`. ### Locally running the connector ``` -python main_dev.py spec -python main_dev.py check --config secrets/config.json -python main_dev.py discover --config secrets/config.json -python main_dev.py read --config secrets/config.json --catalog sample_files/configured_catalog.json +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json ``` ### Unit Tests diff --git a/airbyte-integrations/connectors/source-facebook-marketing/build.gradle b/airbyte-integrations/connectors/source-facebook-marketing/build.gradle index d3a09bc0ba8831..0540e145cdca9c 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/build.gradle +++ b/airbyte-integrations/connectors/source-facebook-marketing/build.gradle @@ -23,5 +23,4 @@ integrationTest.dependsOn("pythonIntegrationTests") dependencies { implementation files(project(':airbyte-integrations:bases:base-standard-source-test-file').airbyteDocker.outputs) - implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs) } diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/__init__.py b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/conftest.py b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/conftest.py new file mode 100644 index 00000000000000..769343d25bd638 --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/conftest.py @@ -0,0 +1,49 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import json + +import pytest + + +@pytest.fixture(scope="session", name="config") +def config_fixture(): + with open("secrets/config.json", "r") as config_file: + return json.load(config_file) + + +@pytest.fixture(scope="session", name="config_with_wrong_token") +def config_with_wrong_token_fixture(config): + return {**config, "access_token": "WRONG_TOKEN"} + + +@pytest.fixture(scope="session", name="config_with_wrong_account") +def config_with_wrong_account_fixture(config): + return {**config, "account_id": "WRONG_ACCOUNT"} + + +@pytest.fixture(scope="session", name="config_with_include_deleted") +def config_with_include_deleted_fixture(config): + return {**config, "include_deleted": True} diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_client.py b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_client.py new file mode 100644 index 00000000000000..a3abe30dac1a9d --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_client.py @@ -0,0 +1,42 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +import pytest +from airbyte_cdk.models import AirbyteStream +from source_facebook_marketing.client import Client, FacebookAPIException + + +def test__health_check_with_wrong_token(config_with_wrong_token): + client = Client(**config_with_wrong_token) + alive, error = client.health_check() + + assert not alive + assert error == "Error: 190, Invalid OAuth access token." + + +def test__campaigns_with_wrong_token(config_with_wrong_token): + client = Client(**config_with_wrong_token) + with pytest.raises(FacebookAPIException, match="Error: 190, Invalid OAuth access token"): + next(client.read_stream(AirbyteStream(name="campaigns", json_schema={}))) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_streams.py similarity index 76% rename from airbyte-integrations/connectors/source-facebook-marketing/integration_tests/integration_test.py rename to airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_streams.py index 228dd95c85954f..bf216bc3c57b89 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_streams.py @@ -24,30 +24,16 @@ import copy -import json -from typing import List, Set, Tuple +from typing import Any, List, MutableMapping, Set, Tuple import pytest -from airbyte_protocol import AirbyteMessage, ConfiguredAirbyteCatalog, SyncMode, Type -from base_python import AirbyteLogger +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, SyncMode, Type from source_facebook_marketing.source import SourceFacebookMarketing -@pytest.fixture(scope="session", name="stream_config") -def config_fixture(): - with open("secrets/config.json", "r") as config_file: - return json.load(config_file) - - -@pytest.fixture(scope="session", name="stream_config_with_include_deleted") -def config_with_include_deleted_fixture(stream_config): - patched_config = copy.deepcopy(stream_config) - patched_config["include_deleted"] = True - return patched_config - - @pytest.fixture(scope="session", name="state") -def state_fixture(): +def state_fixture() -> MutableMapping[str, MutableMapping[str, Any]]: return { "ads": {"updated_time": "2021-02-19T10:42:40-0800"}, "adsets": {"updated_time": "2021-02-19T10:42:40-0800"}, @@ -74,18 +60,18 @@ class TestFacebookMarketingSource: @pytest.mark.parametrize( "catalog_path", ["sample_files/configured_catalog_adsinsights.json", "sample_files/configured_catalog_adcreatives.json"] ) - def test_streams_outputs_records(self, catalog_path, stream_config): + def test_streams_outputs_records(self, catalog_path, config): configured_catalog = ConfiguredAirbyteCatalog.parse_file(catalog_path) - records, states = self._read_records(stream_config, configured_catalog) + records, states = self._read_records(config, configured_catalog) assert records, "should have some records returned" if configured_catalog.streams[0].sync_mode == SyncMode.incremental: assert states, "should have some states returned" @pytest.mark.parametrize("stream_name, deleted_num", [("ads", 2), ("campaigns", 3), ("adsets", 1)]) - def test_streams_with_include_deleted(self, stream_name, deleted_num, stream_config_with_include_deleted, configured_catalog): + def test_streams_with_include_deleted(self, stream_name, deleted_num, config_with_include_deleted, configured_catalog): catalog = self.slice_catalog(configured_catalog, {stream_name}) - records, states = self._read_records(stream_config_with_include_deleted, catalog) + records, states = self._read_records(config_with_include_deleted, catalog) deleted_records = list(filter(self._deleted_record, records)) assert states @@ -94,10 +80,10 @@ def test_streams_with_include_deleted(self, stream_name, deleted_num, stream_con assert len(deleted_records) == deleted_num, f"{stream_name} should have {deleted_num} deleted records returned" - def test_campaign_stream_specific_deleted_id_pulled(self, stream_config_with_include_deleted, configured_catalog): + def test_campaign_stream_specific_deleted_id_pulled(self, config_with_include_deleted, configured_catalog): specific_campaign_id = "23846541919710398" catalog = self.slice_catalog(configured_catalog, {"campaigns"}) - records, _ = self._read_records(stream_config_with_include_deleted, catalog) + records, _ = self._read_records(config_with_include_deleted, catalog) is_specific_campaign_pulled = False for record in records: @@ -108,23 +94,21 @@ def test_campaign_stream_specific_deleted_id_pulled(self, stream_config_with_inc assert is_specific_campaign_pulled is True, f"campaigns stream should have a deleted campaign with id={specific_campaign_id}" @pytest.mark.parametrize("stream_name, deleted_num", [("ads", 2), ("campaigns", 3), ("adsets", 1)]) - def test_streams_with_include_deleted_and_state( - self, stream_name, deleted_num, stream_config_with_include_deleted, configured_catalog, state - ): + def test_streams_with_include_deleted_and_state(self, stream_name, deleted_num, config_with_include_deleted, configured_catalog, state): """Should ignore state because of include_deleted enabled""" catalog = self.slice_catalog(configured_catalog, {stream_name}) - records, states = self._read_records(stream_config_with_include_deleted, catalog, state=state) + records, states = self._read_records(config_with_include_deleted, catalog, state=state) deleted_records = list(filter(self._deleted_record, records)) assert len(deleted_records) == deleted_num, f"{stream_name} should have {deleted_num} deleted records returned" @pytest.mark.parametrize("stream_name, deleted_num", [("ads", 0), ("campaigns", 0), ("adsets", 0)]) def test_streams_with_include_deleted_and_state_with_included_deleted( - self, stream_name, deleted_num, stream_config_with_include_deleted, configured_catalog, state_with_include_deleted + self, stream_name, deleted_num, config_with_include_deleted, configured_catalog, state_with_include_deleted ): """Should keep state because of include_deleted enabled previously""" catalog = self.slice_catalog(configured_catalog, {stream_name}) - records, states = self._read_records(stream_config_with_include_deleted, catalog, state=state_with_include_deleted) + records, states = self._read_records(config_with_include_deleted, catalog, state=state_with_include_deleted) deleted_records = list(filter(self._deleted_record, records)) assert len(deleted_records) == deleted_num, f"{stream_name} should have {deleted_num} deleted records returned" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/main_dev.py b/airbyte-integrations/connectors/source-facebook-marketing/main.py similarity index 96% rename from airbyte-integrations/connectors/source-facebook-marketing/main_dev.py rename to airbyte-integrations/connectors/source-facebook-marketing/main.py index 7eebf8008b9e79..020e63fa64d087 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/main_dev.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/main.py @@ -25,7 +25,7 @@ import sys -from base_python.entrypoint import launch +from airbyte_cdk.entrypoint import launch from source_facebook_marketing import SourceFacebookMarketing if __name__ == "__main__": diff --git a/airbyte-integrations/connectors/source-facebook-marketing/requirements.txt b/airbyte-integrations/connectors/source-facebook-marketing/requirements.txt index dd447512e620ad..7b9114ed5867e1 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/requirements.txt +++ b/airbyte-integrations/connectors/source-facebook-marketing/requirements.txt @@ -1,4 +1,2 @@ # This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies. --e ../../bases/airbyte-protocol --e ../../bases/base-python -e . diff --git a/airbyte-integrations/connectors/source-facebook-marketing/setup.py b/airbyte-integrations/connectors/source-facebook-marketing/setup.py index 277412bafee2f3..e849048325ebc0 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/setup.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/setup.py @@ -25,20 +25,28 @@ from setuptools import find_packages, setup +MAIN_REQUIREMENTS = [ + "airbyte-cdk~=0.1", + "cached_property~=1.5", + "facebook_business~=10.0", + "pendulum~=1.2", +] + +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "pytest-mock~=3.6", + "requests_mock~=1.8", +] + setup( name="source_facebook_marketing", description="Source implementation for Facebook Marketing.", author="Airbyte", author_email="contact@airbyte.io", packages=find_packages(), - install_requires=[ - "airbyte-protocol==0.0.0", - "base-python==0.0.0", - "facebook_business==10.0.0", - "backoff==1.10.0", - "pendulum==1.2.0", - "cached_property==1.5.2", - "pytest==6.1.2", - ], + install_requires=MAIN_REQUIREMENTS, package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, ) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py index 5211d71b5aa36c..5959a24e39c3ef 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/api.py @@ -26,15 +26,19 @@ import time from abc import ABC, abstractmethod from functools import partial -from typing import Any, Callable, Iterator, Mapping, MutableMapping, Sequence +from typing import Any, Callable, Iterable, Iterator, Mapping, MutableMapping, Sequence import backoff import pendulum as pendulum -from base_python.entrypoint import logger # FIXME (Eugene K): use standard logger + +# FIXME (Eugene K): use standard logger +from airbyte_cdk.entrypoint import logger +from facebook_business.adobjects.ad import Ad from facebook_business.adobjects.adreportrun import AdReportRun +from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse from facebook_business.exceptions import FacebookBadObjectError, FacebookRequestError -from .common import JobTimeoutException, deep_merge, retry_pattern +from .common import JobTimeoutException, batch, deep_merge, retry_pattern backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) @@ -176,34 +180,30 @@ class AdCreativeAPI(StreamAPI): """ entity_prefix = "adcreative" - BATCH_SIZE = 50 + batch_size = 50 def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - # Create the initial batch - api_batch = self._api._api.new_batch() + # get pending requests for each creative + requests = [creative.api_get(fields=fields, pending=True) for creative in self.read(getter=self._get_creatives)] + for requests_batch in batch(requests, size=self.batch_size): + yield from self.execute_in_batch(requests_batch) + + @backoff_policy + def execute_in_batch(self, requests: Iterable[FacebookRequest]) -> Sequence[MutableMapping[str, Any]]: records = [] - def success(response): + def success(response: FacebookResponse): records.append(response.json()) - def failure(response): + def failure(response: FacebookResponse): raise response.error() - # This loop syncs minimal AdCreative objects - for i, creative in enumerate(self.read(getter=self._get_creatives), start=1): - # Execute and create a new batch for every BATCH_SIZE added - if i % self.BATCH_SIZE == 0: - api_batch.execute() - api_batch = self._api._api.new_batch() - yield from records - records[:] = [] - - # Add a call to the batch with the full object - creative.api_get(fields=fields, batch=api_batch, success=success, failure=failure) - - # Ensure the final batch is executed + api_batch: FacebookAdsApiBatch = self._api._api.new_batch() + for request in requests: + api_batch.add_request(request, success=success, failure=failure) api_batch.execute() - yield from records + + return records @backoff_policy def _get_creatives(self, params: Mapping[str, Any]) -> Iterator: @@ -230,7 +230,7 @@ def _get_ads(self, params: Mapping[str, Any]): return self._api.account.get_ads(params=params, fields=[self.state_pk]) @backoff_policy - def _extend_record(self, ad, fields): + def _extend_record(self, ad: Ad, fields: Sequence[str]): return ad.api_get(fields=fields).export_all_data() diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/client.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/client.py index c4f8aa39740d10..f6323735f8843c 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/client.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/client.py @@ -26,10 +26,10 @@ from typing import Any, Mapping, Tuple import pendulum as pendulum -from base_python import BaseClient # FIXME (Eugene K): register logger as standard python logger -from base_python.entrypoint import logger +from airbyte_cdk.entrypoint import logger +from airbyte_cdk.sources.deprecated.client import BaseClient from cached_property import cached_property from facebook_business import FacebookAdsApi from facebook_business.adobjects import user as fb_user diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/common.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/common.py index f4cf507f80f879..dd2c7b3b63565d 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/common.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/client/common.py @@ -22,14 +22,19 @@ # SOFTWARE. # - +import json import sys +from time import sleep +from typing import Sequence import backoff -from base_python.entrypoint import logger # FIXME (Eugene K): register logger as standard python logger +import pendulum +from airbyte_cdk.entrypoint import logger # FIXME (Eugene K): register logger as standard python logger from facebook_business.exceptions import FacebookRequestError FACEBOOK_UNKNOWN_ERROR_CODE = 99 +FACEBOOK_API_CALL_LIMIT_ERROR_CODES = (4, 17, 32, 613, 8000, 80001, 80002, 80003, 80004, 80005, 80006, 80008) +DEFAULT_SLEEP_INTERVAL = pendulum.Interval(minutes=1) class FacebookAPIException(Exception): @@ -40,6 +45,33 @@ class JobTimeoutException(Exception): """Scheduled job timed out""" +def batch(iterable: Sequence, size: int = 1): + total_size = len(iterable) + for ndx in range(0, total_size, size): + yield iterable[ndx : min(ndx + size, total_size)] + + +def handle_call_rate_response(exc: FacebookRequestError) -> bool: + pause_time = DEFAULT_SLEEP_INTERVAL + platform_header = exc.http_headers().get("x-app-usage") or exc.http_headers().get("x-ad-account-usage") + if platform_header: + platform_header = json.loads(platform_header) + call_count = platform_header.get("call_count") or platform_header.get("acc_id_util_pct") + if call_count > 99: + logger.info(f"Reached platform call limit: {exc}") + + buc_header = exc.http_headers().get("x-business-use-case-usage") + buc_header = json.loads(buc_header) if buc_header else {} + for business_object_id, stats in buc_header.items(): + if stats["call_count"] > 99: + logger.info(f"Reached call limit on {stats['type']}: {exc}") + pause_time = max(pause_time, stats["estimated_time_to_regain_access"]) + logger.info(f"Sleeping for {pause_time.total_seconds()} seconds") + sleep(pause_time.total_seconds()) + + return True + + def retry_pattern(backoff_type, exception, **wait_gen_kwargs): def log_retry_attempt(details): _, exc, _ = sys.exc_info() @@ -48,6 +80,8 @@ def log_retry_attempt(details): def should_retry_api_error(exc): if isinstance(exc, FacebookRequestError): + if exc.api_error_code() in FACEBOOK_API_CALL_LIMIT_ERROR_CODES: + return handle_call_rate_response(exc) return exc.api_transient_error() or exc.api_error_subcode() == FACEBOOK_UNKNOWN_ERROR_CODE return False diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index 92663356b81674..1c689ee4ac698f 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -23,7 +23,7 @@ # -from base_python import BaseSource +from airbyte_cdk.sources.deprecated.base_source import BaseSource from .client import Client diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py index b25fa9b7e2e6a5..87ca1fad4dcf50 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py @@ -22,21 +22,145 @@ # SOFTWARE. # +import json +import pendulum import pytest -from airbyte_protocol import AirbyteStream -from source_facebook_marketing.client import Client, FacebookAPIException +from airbyte_cdk.models import AirbyteStream +from facebook_business import FacebookSession +from facebook_business.exceptions import FacebookRequestError +from source_facebook_marketing.client import Client -def test__health_check_with_wrong_token(): - client = Client(account_id="wrong_account", access_token="wrong_key", start_date="2019-03-03T10:00") - alive, error = client.health_check() +@pytest.fixture(scope="session", name="account_id") +def account_id_fixture(): + return "unknown_account" - assert not alive - assert error == "Error: 190, Invalid OAuth access token." +@pytest.fixture(scope="session", name="some_config") +def some_config_fixture(account_id): + return {"start_date": "2021-01-23T00:00:00Z", "account_id": f"{account_id}", "access_token": "unknown_token"} -def test__campaigns_with_wrong_token(): - client = Client(account_id="wrong_account", access_token="wrong_key", start_date="2019-03-03T10:00") - with pytest.raises(FacebookAPIException, match="Error: 190, Invalid OAuth access token"): - next(client.read_stream(AirbyteStream(name="campaigns", json_schema={}))) + +@pytest.fixture(autouse=True) +def mock_default_sleep_interval(mocker): + mocker.patch("source_facebook_marketing.client.common.DEFAULT_SLEEP_INTERVAL", return_value=pendulum.Interval(seconds=5)) + + +@pytest.fixture(name="client") +def client_fixture(some_config, requests_mock, fb_account_response): + client = Client(**some_config) + requests_mock.register_uri("GET", FacebookSession.GRAPH + "/v10.0/me/adaccounts", [fb_account_response]) + return client + + +@pytest.fixture(name="fb_call_rate_response") +def fb_call_rate_response_fixture(): + error = {"message": "(#32) Page request limit reached", "type": "OAuthException", "code": 32, "fbtrace_id": "Fz54k3GZrio"} + + headers = {"x-app-usage": json.dumps({"call_count": 28, "total_time": 25, "total_cputime": 25})} + + return { + "json": { + "error": error, + }, + "status_code": 400, + "headers": headers, + } + + +@pytest.fixture(name="fb_account_response") +def fb_account_response_fixture(account_id): + return { + "json": { + "data": [ + { + "account_id": account_id, + "id": f"act_{account_id}", + } + ], + "paging": {"cursors": {"before": "MjM4NDYzMDYyMTcyNTAwNzEZD", "after": "MjM4NDYzMDYyMTcyNTAwNzEZD"}}, + }, + "status_code": 200, + } + + +class TestBackoff: + def test_limit_reached(self, requests_mock, client, fb_call_rate_response, account_id): + """Error once, check that we retry and not fail""" + campaign_responses = [ + fb_call_rate_response, + { + "json": {"data": [{"id": 1, "updated_time": "2020-09-25T00:00:00Z"}, {"id": 2, "updated_time": "2020-09-25T00:00:00Z"}]}, + "status_code": 200, + }, + ] + + requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/v10.0/act_{account_id}/campaigns", campaign_responses) + requests_mock.register_uri("GET", FacebookSession.GRAPH + "/v10.0/1/", [{"status_code": 200}]) + requests_mock.register_uri("GET", FacebookSession.GRAPH + "/v10.0/2/", [{"status_code": 200}]) + + records = list(client.read_stream(AirbyteStream(name="campaigns", json_schema={}))) + + assert records + + def test_batch_limit_reached(self, requests_mock, client, fb_call_rate_response, account_id): + """Error once, check that we retry and not fail""" + responses = [ + fb_call_rate_response, + { + "json": { + "data": [ + { + "id": "123", + "object_type": "SHARE", + "status": "ACTIVE", + }, + { + "id": "1234", + "object_type": "SHARE", + "status": "ACTIVE", + }, + ], + "status_code": 200, + } + }, + ] + + batch_responses = [ + fb_call_rate_response, + { + "json": [ + { + "body": json.dumps({"name": "creative 1"}), + "code": 200, + }, + { + "body": json.dumps({"name": "creative 2"}), + "code": 200, + }, + ] + }, + ] + + requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/v10.0/act_{account_id}/adcreatives", responses) + requests_mock.register_uri("POST", FacebookSession.GRAPH + "/v10.0/", batch_responses) + + records = list(client.read_stream(AirbyteStream(name="adcreatives", json_schema={}))) + + assert records == [{"name": "creative 1"}, {"name": "creative 2"}] + + def test_server_error(self, requests_mock, client, account_id): + """Error once, check that we retry and not fail""" + responses = [ + {"json": {"error": {}}, "status_code": 500}, + { + "json": {"data": [{"id": 1, "updated_time": "2020-09-25T00:00:00Z"}, {"id": 2, "updated_time": "2020-09-25T00:00:00Z"}]}, + "status_code": 200, + }, + ] + + requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/v10.0/act_{account_id}/campaigns", responses) + + with pytest.raises(FacebookRequestError): + list(client.read_stream(AirbyteStream(name="campaigns", json_schema={})))