Skip to content

Commit

Permalink
Source Facebook: rate limit not always handled #3525 (#3600)
Browse files Browse the repository at this point in the history
* fix backoff of batch requests

* fix backoff of batch requests

* fix tests

* fix tests

* fix requirements

* fix tests

* fix tests

* sort codes

* format

* fix setup

* fix integration tests

* bump version

* fix integration tests

Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com>
  • Loading branch information
keu and eugene-kulak committed May 26, 2021
1 parent 2102c49 commit 008e22e
Show file tree
Hide file tree
Showing 18 changed files with 337 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c",
"name": "Facebook Marketing",
"dockerRepository": "airbyte/source-facebook-marketing",
"dockerImageTag": "0.2.5",
"dockerImageTag": "0.2.6",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing",
"icon": "facebook.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
- sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
name: Facebook Marketing
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.5
dockerImageTag: 0.2.6
documentationUrl: https://hub.docker.com/r/airbyte/source-facebook-marketing
icon: facebook.svg
- sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@
## 0.2.4
Fix an issue that caused losing Insights data from the past 28 days while incremental sync: https://github.com/airbytehq/airbyte/pull/3395

## 0.2.5
Allow configuring insights lookback window (#3396)

## 0.2.6
Fix handling call rate limit (#3525)
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
FROM airbyte/integration-base-python:0.1.1
FROM python:3.7-slim

# Bash is installed for more convenient debugging.
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*

ENV CODE_PATH="source_facebook_marketing"
ENV AIRBYTE_IMPL_MODULE="source_facebook_marketing"
ENV AIRBYTE_IMPL_PATH="SourceFacebookMarketing"

WORKDIR /airbyte/integration_code
COPY $CODE_PATH ./$CODE_PATH
COPY source_facebook_marketing ./source_facebook_marketing
COPY main.py ./
COPY setup.py ./
RUN pip install .

LABEL io.airbyte.version=0.2.5
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.6
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ and place them into `secrets/config.json`.

### Locally running the connector
```
python main_dev.py spec
python main_dev.py check --config secrets/config.json
python main_dev.py discover --config secrets/config.json
python main_dev.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
```

### Unit Tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@ integrationTest.dependsOn("pythonIntegrationTests")

dependencies {
implementation files(project(':airbyte-integrations:bases:base-standard-source-test-file').airbyteDocker.outputs)
implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs)
}
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#


import json

import pytest


@pytest.fixture(scope="session", name="config")
def config_fixture():
with open("secrets/config.json", "r") as config_file:
return json.load(config_file)


@pytest.fixture(scope="session", name="config_with_wrong_token")
def config_with_wrong_token_fixture(config):
return {**config, "access_token": "WRONG_TOKEN"}


@pytest.fixture(scope="session", name="config_with_wrong_account")
def config_with_wrong_account_fixture(config):
return {**config, "account_id": "WRONG_ACCOUNT"}


@pytest.fixture(scope="session", name="config_with_include_deleted")
def config_with_include_deleted_fixture(config):
return {**config, "include_deleted": True}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#


import pytest
from airbyte_cdk.models import AirbyteStream
from source_facebook_marketing.client import Client, FacebookAPIException


def test__health_check_with_wrong_token(config_with_wrong_token):
client = Client(**config_with_wrong_token)
alive, error = client.health_check()

assert not alive
assert error == "Error: 190, Invalid OAuth access token."


def test__campaigns_with_wrong_token(config_with_wrong_token):
client = Client(**config_with_wrong_token)
with pytest.raises(FacebookAPIException, match="Error: 190, Invalid OAuth access token"):
next(client.read_stream(AirbyteStream(name="campaigns", json_schema={})))
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,16 @@


import copy
import json
from typing import List, Set, Tuple
from typing import Any, List, MutableMapping, Set, Tuple

import pytest
from airbyte_protocol import AirbyteMessage, ConfiguredAirbyteCatalog, SyncMode, Type
from base_python import AirbyteLogger
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, SyncMode, Type
from source_facebook_marketing.source import SourceFacebookMarketing


@pytest.fixture(scope="session", name="stream_config")
def config_fixture():
with open("secrets/config.json", "r") as config_file:
return json.load(config_file)


@pytest.fixture(scope="session", name="stream_config_with_include_deleted")
def config_with_include_deleted_fixture(stream_config):
patched_config = copy.deepcopy(stream_config)
patched_config["include_deleted"] = True
return patched_config


@pytest.fixture(scope="session", name="state")
def state_fixture():
def state_fixture() -> MutableMapping[str, MutableMapping[str, Any]]:
return {
"ads": {"updated_time": "2021-02-19T10:42:40-0800"},
"adsets": {"updated_time": "2021-02-19T10:42:40-0800"},
Expand All @@ -74,18 +60,18 @@ class TestFacebookMarketingSource:
@pytest.mark.parametrize(
"catalog_path", ["sample_files/configured_catalog_adsinsights.json", "sample_files/configured_catalog_adcreatives.json"]
)
def test_streams_outputs_records(self, catalog_path, stream_config):
def test_streams_outputs_records(self, catalog_path, config):
configured_catalog = ConfiguredAirbyteCatalog.parse_file(catalog_path)
records, states = self._read_records(stream_config, configured_catalog)
records, states = self._read_records(config, configured_catalog)

assert records, "should have some records returned"
if configured_catalog.streams[0].sync_mode == SyncMode.incremental:
assert states, "should have some states returned"

@pytest.mark.parametrize("stream_name, deleted_num", [("ads", 2), ("campaigns", 3), ("adsets", 1)])
def test_streams_with_include_deleted(self, stream_name, deleted_num, stream_config_with_include_deleted, configured_catalog):
def test_streams_with_include_deleted(self, stream_name, deleted_num, config_with_include_deleted, configured_catalog):
catalog = self.slice_catalog(configured_catalog, {stream_name})
records, states = self._read_records(stream_config_with_include_deleted, catalog)
records, states = self._read_records(config_with_include_deleted, catalog)
deleted_records = list(filter(self._deleted_record, records))

assert states
Expand All @@ -94,10 +80,10 @@ def test_streams_with_include_deleted(self, stream_name, deleted_num, stream_con

assert len(deleted_records) == deleted_num, f"{stream_name} should have {deleted_num} deleted records returned"

def test_campaign_stream_specific_deleted_id_pulled(self, stream_config_with_include_deleted, configured_catalog):
def test_campaign_stream_specific_deleted_id_pulled(self, config_with_include_deleted, configured_catalog):
specific_campaign_id = "23846541919710398"
catalog = self.slice_catalog(configured_catalog, {"campaigns"})
records, _ = self._read_records(stream_config_with_include_deleted, catalog)
records, _ = self._read_records(config_with_include_deleted, catalog)

is_specific_campaign_pulled = False
for record in records:
Expand All @@ -108,23 +94,21 @@ def test_campaign_stream_specific_deleted_id_pulled(self, stream_config_with_inc
assert is_specific_campaign_pulled is True, f"campaigns stream should have a deleted campaign with id={specific_campaign_id}"

@pytest.mark.parametrize("stream_name, deleted_num", [("ads", 2), ("campaigns", 3), ("adsets", 1)])
def test_streams_with_include_deleted_and_state(
self, stream_name, deleted_num, stream_config_with_include_deleted, configured_catalog, state
):
def test_streams_with_include_deleted_and_state(self, stream_name, deleted_num, config_with_include_deleted, configured_catalog, state):
"""Should ignore state because of include_deleted enabled"""
catalog = self.slice_catalog(configured_catalog, {stream_name})
records, states = self._read_records(stream_config_with_include_deleted, catalog, state=state)
records, states = self._read_records(config_with_include_deleted, catalog, state=state)
deleted_records = list(filter(self._deleted_record, records))

assert len(deleted_records) == deleted_num, f"{stream_name} should have {deleted_num} deleted records returned"

@pytest.mark.parametrize("stream_name, deleted_num", [("ads", 0), ("campaigns", 0), ("adsets", 0)])
def test_streams_with_include_deleted_and_state_with_included_deleted(
self, stream_name, deleted_num, stream_config_with_include_deleted, configured_catalog, state_with_include_deleted
self, stream_name, deleted_num, config_with_include_deleted, configured_catalog, state_with_include_deleted
):
"""Should keep state because of include_deleted enabled previously"""
catalog = self.slice_catalog(configured_catalog, {stream_name})
records, states = self._read_records(stream_config_with_include_deleted, catalog, state=state_with_include_deleted)
records, states = self._read_records(config_with_include_deleted, catalog, state=state_with_include_deleted)
deleted_records = list(filter(self._deleted_record, records))

assert len(deleted_records) == deleted_num, f"{stream_name} should have {deleted_num} deleted records returned"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import sys

from base_python.entrypoint import launch
from airbyte_cdk.entrypoint import launch
from source_facebook_marketing import SourceFacebookMarketing

if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
# This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies.
-e ../../bases/airbyte-protocol
-e ../../bases/base-python
-e .
26 changes: 17 additions & 9 deletions airbyte-integrations/connectors/source-facebook-marketing/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,28 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"cached_property~=1.5",
"facebook_business~=10.0",
"pendulum~=1.2",
]

TEST_REQUIREMENTS = [
"pytest~=6.1",
"pytest-mock~=3.6",
"requests_mock~=1.8",
]

setup(
name="source_facebook_marketing",
description="Source implementation for Facebook Marketing.",
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=[
"airbyte-protocol==0.0.0",
"base-python==0.0.0",
"facebook_business==10.0.0",
"backoff==1.10.0",
"pendulum==1.2.0",
"cached_property==1.5.2",
"pytest==6.1.2",
],
install_requires=MAIN_REQUIREMENTS,
package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]},
extras_require={
"tests": TEST_REQUIREMENTS,
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@
import time
from abc import ABC, abstractmethod
from functools import partial
from typing import Any, Callable, Iterator, Mapping, MutableMapping, Sequence
from typing import Any, Callable, Iterable, Iterator, Mapping, MutableMapping, Sequence

import backoff
import pendulum as pendulum
from base_python.entrypoint import logger # FIXME (Eugene K): use standard logger

# FIXME (Eugene K): use standard logger
from airbyte_cdk.entrypoint import logger
from facebook_business.adobjects.ad import Ad
from facebook_business.adobjects.adreportrun import AdReportRun
from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse
from facebook_business.exceptions import FacebookBadObjectError, FacebookRequestError

from .common import JobTimeoutException, deep_merge, retry_pattern
from .common import JobTimeoutException, batch, deep_merge, retry_pattern

backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)

Expand Down Expand Up @@ -176,34 +180,30 @@ class AdCreativeAPI(StreamAPI):
"""

entity_prefix = "adcreative"
BATCH_SIZE = 50
batch_size = 50

def list(self, fields: Sequence[str] = None) -> Iterator[dict]:
# Create the initial batch
api_batch = self._api._api.new_batch()
# get pending requests for each creative
requests = [creative.api_get(fields=fields, pending=True) for creative in self.read(getter=self._get_creatives)]
for requests_batch in batch(requests, size=self.batch_size):
yield from self.execute_in_batch(requests_batch)

@backoff_policy
def execute_in_batch(self, requests: Iterable[FacebookRequest]) -> Sequence[MutableMapping[str, Any]]:
records = []

def success(response):
def success(response: FacebookResponse):
records.append(response.json())

def failure(response):
def failure(response: FacebookResponse):
raise response.error()

# This loop syncs minimal AdCreative objects
for i, creative in enumerate(self.read(getter=self._get_creatives), start=1):
# Execute and create a new batch for every BATCH_SIZE added
if i % self.BATCH_SIZE == 0:
api_batch.execute()
api_batch = self._api._api.new_batch()
yield from records
records[:] = []

# Add a call to the batch with the full object
creative.api_get(fields=fields, batch=api_batch, success=success, failure=failure)

# Ensure the final batch is executed
api_batch: FacebookAdsApiBatch = self._api._api.new_batch()
for request in requests:
api_batch.add_request(request, success=success, failure=failure)
api_batch.execute()
yield from records

return records

@backoff_policy
def _get_creatives(self, params: Mapping[str, Any]) -> Iterator:
Expand All @@ -230,7 +230,7 @@ def _get_ads(self, params: Mapping[str, Any]):
return self._api.account.get_ads(params=params, fields=[self.state_pk])

@backoff_policy
def _extend_record(self, ad, fields):
def _extend_record(self, ad: Ad, fields: Sequence[str]):
return ad.api_get(fields=fields).export_all_data()


Expand Down
Loading

0 comments on commit 008e22e

Please sign in to comment.