From 16b7dc1e9cf965a3c671ce16f8e2515680a517bf Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Fri, 4 Feb 2022 10:17:06 +0200 Subject: [PATCH] fix tests --- .../streams/async_job.py | 59 ++++++------------- .../streams/base_insight_streams.py | 2 +- .../unit_tests/test_async_job.py | 39 ++++++------ 3 files changed, 40 insertions(+), 60 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py index 0e27a57da88ade..e558cf900b73e4 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py @@ -49,9 +49,9 @@ def __init__(self, api: FacebookAdsApi, interval: pendulum.Period): self._attempt_number = 1 @property - def key(self) -> str: + def key(self) -> Any: """Job identifier, in most cases start of the interval""" - return str(self._interval.start.date()) + return self._interval.start @abstractmethod def start(self): @@ -176,20 +176,18 @@ def split_job(self) -> "AsyncJob": """ campaign_params = dict(copy.deepcopy(self._params)) # get campaigns from attribution window as well (28 day + 1 current day) - new_start = self._interval.start.date() - pendulum.duration(days=28 + 1) + new_start = self._interval.start - pendulum.duration(days=28 + 1) campaign_params.update(fields=["campaign_id"], level="campaign") campaign_params["time_range"].update(since=new_start.to_date_string()) campaign_params.pop("time_increment") # query all days result = self._edge_object.get_insights(params=campaign_params) campaign_ids = set(row["campaign_id"] for row in result) - logger.info( - "Got %(num)s campaigns for period %(period)s: %(campaign_ids)s", - num=len(campaign_ids), - period=self._params["time_range"], - campaign_ids=campaign_ids, - ) - - jobs = [InsightAsyncJob(api=self._api, edge_object=Campaign(pk), params=self._params) for pk in campaign_ids] + logger.info(f"Got {len(campaign_ids)} campaigns for period {self._interval}: {campaign_ids}") + + jobs = [ + InsightAsyncJob(api=self._api, edge_object=Campaign(pk), params=self._params, interval=self._interval) + for pk in campaign_ids + ] return ParentAsyncJob(api=self._api, interval=self._interval, jobs=jobs) def start(self): @@ -199,13 +197,7 @@ def start(self): self._job = self._edge_object.get_insights(params=self._params, is_async=True) self._start_time = pendulum.now() - logger.info( - "Created AdReportRun: %(job_id)s to sync insights %(time_range)s with breakdown %(breakdowns)s for %(obj)s", - job_id=self._job["report_run_id"], - time_range=self._params["time_range"], - breakdowns=self._params["breakdowns"], - obj=self._edge_object, - ) + logger.info(f"{self}: created AdReportRun") def restart(self): """Restart failed job""" @@ -218,7 +210,7 @@ def restart(self): self._finish_time = None self._attempt_number += 1 self.start() - logger.info("%s: restarted", self) + logger.info(f"{self}: restarted.") @property def elapsed_time(self) -> Optional[pendulum.duration]: @@ -250,7 +242,7 @@ def _batch_success_handler(self, response: FacebookResponse): def _batch_failure_handler(self, response: FacebookResponse): """Update job status from response""" - logger.info("Request failed with response: %s", response.body()) + logger.info(f"{self}: Request failed with response: {response.body()}.") def update_job(self, batch: Optional[FacebookAdsApiBatch] = None): """Method to retrieve job's status, separated because of retry handler""" @@ -258,12 +250,9 @@ def update_job(self, batch: Optional[FacebookAdsApiBatch] = None): raise RuntimeError(f"{self}: Incorrect usage of the method - the job is not started") if self.completed: - logger.info( - "%(job)s is %(percent)s complete (%(status)s)", - job=self, - percent=self._job["async_percent_completion"], - status=self._job["async_status"], - ) + job_status = self._job["async_status"] + percent = self._job["async_percent_completion"] + logger.info(f"{self}: is {percent} complete ({job_status})") # No need to update job status if its already completed return @@ -279,12 +268,8 @@ def _check_status(self) -> bool: :return: True if the job is completed, False - if the job is still running """ job_status = self._job["async_status"] - logger.info( - "%(job)s is %(percent)s complete (%(status)s)", - job=self, - percent=self._job["async_percent_completion"], - status=job_status, - ) + percent = self._job["async_percent_completion"] + logger.info(f"{self}: is {percent} complete ({job_status})") if job_status == Status.COMPLETED: self._finish_time = pendulum.now() # TODO: is not actual running time, but interval between check_status calls @@ -292,12 +277,7 @@ def _check_status(self) -> bool: elif job_status in [Status.FAILED, Status.SKIPPED]: self._finish_time = pendulum.now() self._failed = True - logger.info( - "%(job)s has status %(status)s after %(elapsed)s seconds.", - job=self, - status=job_status, - elapsed=self.elapsed_time.in_seconds(), - ) + logger.info(f"{self}: has status {job_status} after {self.elapsed_time.in_seconds()} seconds.") return True return False @@ -311,6 +291,5 @@ def get_result(self) -> Any: def __str__(self) -> str: """String representation of the job wrapper.""" job_id = self._job["report_run_id"] if self._job else "" - time_range = self._params["time_range"] breakdowns = self._params["breakdowns"] - return f"InsightAsyncJob(id={job_id}, {self._edge_object}, time_range={time_range}, breakdowns={breakdowns}" + return f"InsightAsyncJob(id={job_id}, {self._edge_object}, time_range={self._interval}, breakdowns={breakdowns})" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py index ae688c7370ac8d..2207acbf2f4ae6 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py @@ -158,7 +158,7 @@ def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]: if ts_start in self._completed_slices: continue ts_end = ts_start + pendulum.duration(days=self.time_increment - 1) - interval = pendulum.Period(ts_start, ts_end.date()) + interval = pendulum.Period(ts_start, ts_end) yield InsightAsyncJob(api=self._api.api, edge_object=self._api.account, interval=interval, params=params) def stream_slices( diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py index 31a5df243420f3..470149730a2a07 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py @@ -6,6 +6,7 @@ from typing import Iterator from unittest.mock import call +import pendulum import pytest from facebook_business.adobjects.adreportrun import AdReportRun from facebook_business.api import FacebookAdsApiBatch @@ -38,23 +39,21 @@ def job_fixture(api, account, mocker): "fields": ["field1", "field2"], "time_increment": 1, "action_attribution_windows": [], - "time_range": { - "since": "2019-01-01", - "until": "2019-01-01", - }, } + interval = pendulum.Period(pendulum.Date(2019, 1, 1), pendulum.Date(2019, 1, 1)) - return InsightAsyncJob(edge_object=account, api=api, params=params) + return InsightAsyncJob(edge_object=account, api=api, interval=interval, params=params) @pytest.fixture(name="grouped_jobs") def grouped_jobs_fixture(mocker): - return [mocker.Mock(spec=InsightAsyncJob, restart_number=0, failed=False, completed=False) for _ in range(10)] + return [mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False) for _ in range(10)] @pytest.fixture(name="parent_job") def parent_job_fixture(api, grouped_jobs): - return ParentAsyncJob(api=api, jobs=grouped_jobs) + interval = pendulum.Period(pendulum.Date(2019, 1, 1), pendulum.Date(2019, 1, 1)) + return ParentAsyncJob(api=api, jobs=grouped_jobs, interval=interval) @pytest.fixture(name="started_job") @@ -121,12 +120,12 @@ def test_start_already_started(self, job): job.start() def test_restart(self, failed_job, api, adreport): - assert failed_job.restart_number == 0 + assert failed_job.attempt_number == 1 failed_job.restart() assert not failed_job.failed, "restart should reset fail flag" - assert failed_job.restart_number == 1 + assert failed_job.attempt_number == 2 def test_restart_when_job_not_failed(self, job, api): job.start() @@ -220,9 +219,12 @@ def test_failed_yes(self, failed_job): assert failed_job.failed, "should return True if the job previously failed" def test_str(self, api, account): - job = InsightAsyncJob(edge_object=account, api=api, params={"time_range": 123, "breakdowns": [10, 20]}) + interval = pendulum.Period(pendulum.Date(2010, 1, 1), pendulum.Date(2011, 1, 1)) + job = InsightAsyncJob( + edge_object=account, api=api, params={"breakdowns": [10, 20]}, interval=interval, + ) - assert str(job) == f"AdReportRun(id=, {account}, time_range=123, breakdowns=[10, 20]" + assert str(job) == f"InsightAsyncJob(id=, {account}, time_range= 2011-01-01]>, breakdowns=[10, 20])" def test_get_result(self, job, adreport): job.start() @@ -233,11 +235,11 @@ def test_get_result(self, job, adreport): assert result == adreport.get_result.return_value, "should return result from job" def test_get_result_when_job_is_not_started(self, job): - with pytest.raises(RuntimeError, match=r"Incorrect usage of get_result - the job is not started of failed"): + with pytest.raises(RuntimeError, match=r"Incorrect usage of get_result - the job is not started or failed"): job.get_result() def test_get_result_when_job_is_failed(self, failed_job): - with pytest.raises(RuntimeError, match=r"Incorrect usage of get_result - the job is not started of failed"): + with pytest.raises(RuntimeError, match=r"Incorrect usage of get_result - the job is not started or failed"): failed_job.get_result() def test_split_job(self, job, account, mocker, api): @@ -251,8 +253,7 @@ def test_split_job(self, job, account, mocker, api): campaign_mock.assert_has_calls([call(1), call(2), call(3)]) assert parent_job_mock.called assert parent_job - args, kwargs = parent_job_mock.call_args - assert args == (api,) + _args, kwargs = parent_job_mock.call_args assert len(kwargs["jobs"]) == 3, "number of jobs should match number of campaigns" @@ -267,15 +268,15 @@ def test_restart(self, parent_job, grouped_jobs): # fail some jobs grouped_jobs[0].failed = True - grouped_jobs[0].restart_number = 1 + grouped_jobs[0].attempt_number = 2 grouped_jobs[5].failed = True - grouped_jobs[0].restart_number = 1 - grouped_jobs[6].restart_number = 3 + grouped_jobs[0].attempt_number = 2 + grouped_jobs[6].attempt_number = 3 assert parent_job.failed, "should be failed if any job failed" parent_job.restart() assert parent_job.failed - assert parent_job.restart_number == 3, "restart should be max value of all jobs" + assert parent_job.attempt_number == 3, "restart should be max value of all jobs" def test_completed(self, parent_job, grouped_jobs): assert not parent_job.completed, "initially not completed"