Skip to content

Commit

Permalink
Source Facebook Marketing: Attempt to retry failing jobs that are alr…
Browse files Browse the repository at this point in the history
…eady split to minimum size (#12390)

* restart jobs that are already split to smallest size

* manager now fails on nested jobs hitting max attempts

* version bump

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
Phlair and octavia-squidington-iii committed May 3, 2022
1 parent 3ab0899 commit 9ffd5bb
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@
- name: Facebook Marketing
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.44
dockerImageTag: 0.2.45
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-facebook-marketing:0.2.44"
- dockerImage: "airbyte/source-facebook-marketing:0.2.45"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.2.44
LABEL io.airbyte.version=0.2.45
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,13 @@ def split_job(self) -> List["AsyncJob"]:
new_jobs = []
for job in self._jobs:
if job.failed:
new_jobs.extend(job.split_job())
try:
new_jobs.extend(job.split_job())
except ValueError as split_limit_error:
logger.error(split_limit_error)
logger.info(f'can\'t split "{job}" any smaller, attempting to retry the job.')
job.restart()
new_jobs.append(job)
else:
new_jobs.append(job)
return new_jobs
Expand Down Expand Up @@ -202,7 +208,7 @@ def split_job(self) -> List["AsyncJob"]:
return self._split_by_edge_class(AdSet)
elif isinstance(self._edge_object, AdSet):
return self._split_by_edge_class(Ad)
raise RuntimeError("The job is already splitted to the smallest size.")
raise ValueError("The job is already splitted to the smallest size.")

def _split_by_edge_class(self, edge_class: Union[Type[Campaign], Type[AdSet], Type[Ad]]) -> List[AsyncJob]:
"""Split insight job by creating insight jobs from lower edge object, i.e.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ def _check_jobs_status_and_restart(self) -> List[AsyncJob]:
self._wait_throttle_limit_down()
for job in self._running_jobs:
if job.failed:
if isinstance(job, ParentAsyncJob):
# if this job is a ParentAsyncJob, it holds X number of jobs
# we want to check that none of these nested jobs have exceeded MAX_NUMBER_OF_ATTEMPTS
for nested_job in job._jobs:
if nested_job.attempt_number >= self.MAX_NUMBER_OF_ATTEMPTS:
raise JobException(f"{nested_job}: failed more than {self.MAX_NUMBER_OF_ATTEMPTS} times. Terminating...")
if job.attempt_number >= self.MAX_NUMBER_OF_ATTEMPTS:
raise JobException(f"{job}: failed more than {self.MAX_NUMBER_OF_ATTEMPTS} times. Terminating...")
elif job.attempt_number == 2:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def test_split_job_smallest(self, mocker, api):
params = {"time_increment": 1, "breakdowns": []}
job = InsightAsyncJob(api=api, edge_object=Ad(1), interval=interval, params=params)

with pytest.raises(RuntimeError, match="The job is already splitted to the smallest size."):
with pytest.raises(ValueError, match="The job is already splitted to the smallest size."):
job.split_job()


Expand Down Expand Up @@ -415,5 +415,19 @@ def test_split_job(self, parent_job, grouped_jobs, mocker):
else:
job.split_job.assert_not_called()

def test_split_job_smallest(self, parent_job, grouped_jobs):
grouped_jobs[0].failed = True
grouped_jobs[0].split_job.side_effect = ValueError("Mocking smallest size")

# arbitrarily testing this X times, the max attempts is handled by async_job_manager rather than the job itself.
count = 0
while count < 10:
split_jobs = parent_job.split_job()
assert len(split_jobs) == len(
grouped_jobs
), "attempted to split job at smallest size so should just restart job meaning same no. of jobs"
grouped_jobs[0].attempt_number += 1
count += 1

def test_str(self, parent_job, grouped_jobs):
assert str(parent_job) == f"ParentAsyncJob({grouped_jobs[0]} ... {len(grouped_jobs) - 1} jobs more)"
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,26 @@ def update_job_behaviour():

with pytest.raises(JobException, match=f"{jobs[1]}: failed more than {InsightAsyncJobManager.MAX_NUMBER_OF_ATTEMPTS} times."):
next(manager.completed_jobs(), None)

def test_nested_job_failed_too_many_times(self, api, mocker, time_mock, update_job_mock):
"""Manager should fail when a nested job within a ParentAsyncJob failed too many times"""

def update_job_behaviour():
jobs[1].failed = True
sub_jobs[1].failed = True
sub_jobs[1].attempt_number = InsightAsyncJobManager.MAX_NUMBER_OF_ATTEMPTS
yield from range(10)

update_job_mock.side_effect = update_job_behaviour()
sub_jobs = [
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False),
]
jobs = [
mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True),
mocker.Mock(spec=ParentAsyncJob, _jobs=sub_jobs, attempt_number=1, failed=False, completed=False),
]
manager = InsightAsyncJobManager(api=api, jobs=jobs)

with pytest.raises(JobException):
next(manager.completed_jobs(), None)
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ For more information, see the [Facebook Insights API documentation.](https://dev

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.2.45 | 2022-05-03 | [12390](https://github.com/airbytehq/airbyte/pull/12390) | Better retry logic for split-up async jobs |
| 0.2.44 | 2022-04-14 | [11751](https://github.com/airbytehq/airbyte/pull/11751) | Update API to a directly initialise an AdAccount with the given ID |
| 0.2.43 | 2022-04-13 | [11801](https://github.com/airbytehq/airbyte/pull/11801) | Fix `user_tos_accepted` schema to be an object
| 0.2.42 | 2022-04-06 | [11761](https://github.com/airbytehq/airbyte/pull/11761) | Upgrade Facebook Python SDK to version 13|
Expand Down

0 comments on commit 9ffd5bb

Please sign in to comment.