diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 296d70636390b..a9b096d3782cf 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -210,7 +210,7 @@ - name: Facebook Marketing sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.44 + dockerImageTag: 0.2.45 documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing icon: facebook.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 9f925c519434d..aafa7185f3cd2 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -1748,7 +1748,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-facebook-marketing:0.2.44" +- dockerImage: "airbyte/source-facebook-marketing:0.2.45" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 57484775451d5..dbf4bac16a107 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.44 +LABEL io.airbyte.version=0.2.45 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py index 195fb1d1e84db..c27702cf09ae2 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py @@ -158,7 +158,13 @@ def split_job(self) -> List["AsyncJob"]: new_jobs = [] for job in self._jobs: if job.failed: - new_jobs.extend(job.split_job()) + try: + new_jobs.extend(job.split_job()) + except ValueError as split_limit_error: + logger.error(split_limit_error) + logger.info(f'can\'t split "{job}" any smaller, attempting to retry the job.') + job.restart() + new_jobs.append(job) else: new_jobs.append(job) return new_jobs @@ -202,7 +208,7 @@ def split_job(self) -> List["AsyncJob"]: return self._split_by_edge_class(AdSet) elif isinstance(self._edge_object, AdSet): return self._split_by_edge_class(Ad) - raise RuntimeError("The job is already splitted to the smallest size.") + raise ValueError("The job is already splitted to the smallest size.") def _split_by_edge_class(self, edge_class: Union[Type[Campaign], Type[AdSet], Type[Ad]]) -> List[AsyncJob]: """Split insight job by creating insight jobs from lower edge object, i.e. diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py index 85cae8307ab36..a24d2deac0bb6 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py @@ -94,6 +94,12 @@ def _check_jobs_status_and_restart(self) -> List[AsyncJob]: self._wait_throttle_limit_down() for job in self._running_jobs: if job.failed: + if isinstance(job, ParentAsyncJob): + # if this job is a ParentAsyncJob, it holds X number of jobs + # we want to check that none of these nested jobs have exceeded MAX_NUMBER_OF_ATTEMPTS + for nested_job in job._jobs: + if nested_job.attempt_number >= self.MAX_NUMBER_OF_ATTEMPTS: + raise JobException(f"{nested_job}: failed more than {self.MAX_NUMBER_OF_ATTEMPTS} times. Terminating...") if job.attempt_number >= self.MAX_NUMBER_OF_ATTEMPTS: raise JobException(f"{job}: failed more than {self.MAX_NUMBER_OF_ATTEMPTS} times. Terminating...") elif job.attempt_number == 2: diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py index 6d6e14c135c9b..9ea4581f17d6b 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py @@ -336,7 +336,7 @@ def test_split_job_smallest(self, mocker, api): params = {"time_increment": 1, "breakdowns": []} job = InsightAsyncJob(api=api, edge_object=Ad(1), interval=interval, params=params) - with pytest.raises(RuntimeError, match="The job is already splitted to the smallest size."): + with pytest.raises(ValueError, match="The job is already splitted to the smallest size."): job.split_job() @@ -415,5 +415,19 @@ def test_split_job(self, parent_job, grouped_jobs, mocker): else: job.split_job.assert_not_called() + def test_split_job_smallest(self, parent_job, grouped_jobs): + grouped_jobs[0].failed = True + grouped_jobs[0].split_job.side_effect = ValueError("Mocking smallest size") + + # arbitrarily testing this X times, the max attempts is handled by async_job_manager rather than the job itself. + count = 0 + while count < 10: + split_jobs = parent_job.split_job() + assert len(split_jobs) == len( + grouped_jobs + ), "attempted to split job at smallest size so should just restart job meaning same no. of jobs" + grouped_jobs[0].attempt_number += 1 + count += 1 + def test_str(self, parent_job, grouped_jobs): assert str(parent_job) == f"ParentAsyncJob({grouped_jobs[0]} ... {len(grouped_jobs) - 1} jobs more)" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py index ea3e19a7e2d17..2d3b7ca3e7567 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py @@ -151,3 +151,26 @@ def update_job_behaviour(): with pytest.raises(JobException, match=f"{jobs[1]}: failed more than {InsightAsyncJobManager.MAX_NUMBER_OF_ATTEMPTS} times."): next(manager.completed_jobs(), None) + + def test_nested_job_failed_too_many_times(self, api, mocker, time_mock, update_job_mock): + """Manager should fail when a nested job within a ParentAsyncJob failed too many times""" + + def update_job_behaviour(): + jobs[1].failed = True + sub_jobs[1].failed = True + sub_jobs[1].attempt_number = InsightAsyncJobManager.MAX_NUMBER_OF_ATTEMPTS + yield from range(10) + + update_job_mock.side_effect = update_job_behaviour() + sub_jobs = [ + mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True), + mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False), + ] + jobs = [ + mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True), + mocker.Mock(spec=ParentAsyncJob, _jobs=sub_jobs, attempt_number=1, failed=False, completed=False), + ] + manager = InsightAsyncJobManager(api=api, jobs=jobs) + + with pytest.raises(JobException): + next(manager.completed_jobs(), None) diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index ddcadaf1627d8..98bd82243c0f1 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -108,6 +108,7 @@ For more information, see the [Facebook Insights API documentation.](https://dev | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.2.45 | 2022-05-03 | [12390](https://github.com/airbytehq/airbyte/pull/12390) | Better retry logic for split-up async jobs | | 0.2.44 | 2022-04-14 | [11751](https://github.com/airbytehq/airbyte/pull/11751) | Update API to a directly initialise an AdAccount with the given ID | | 0.2.43 | 2022-04-13 | [11801](https://github.com/airbytehq/airbyte/pull/11801) | Fix `user_tos_accepted` schema to be an object | 0.2.42 | 2022-04-06 | [11761](https://github.com/airbytehq/airbyte/pull/11761) | Upgrade Facebook Python SDK to version 13|