From 58b21c2e6b8b594a6e0734b763d0a26d106d6542 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Thu, 3 Feb 2022 23:21:59 +0200 Subject: [PATCH] address comments from @sherifnada --- .../streams/async_job.py | 135 +++++++++++------- .../streams/async_job_manager.py | 12 +- .../streams/base_insight_streams.py | 4 +- 3 files changed, 92 insertions(+), 59 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py index 23f78202efde06..b71b8472032fec 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py @@ -6,18 +6,19 @@ import logging from abc import ABC, abstractmethod from enum import Enum -from typing import Any, List, Mapping, Optional +from typing import Any, List, Mapping, Optional, Sequence, Iterator, Union import pendulum +from facebook_business.adobjects.adaccount import AdAccount from facebook_business.adobjects.adreportrun import AdReportRun from facebook_business.adobjects.campaign import Campaign from facebook_business.adobjects.objectparser import ObjectParser -from facebook_business.api import FacebookAdsApiBatch, FacebookResponse +from facebook_business.api import FacebookAdsApiBatch, FacebookResponse, FacebookAdsApi logger = logging.getLogger("airbyte") -def chunks(data, n): +def chunks(data: Sequence[Any], n: int) -> Iterator[Any]: """Yield successive n-sized chunks from lst.""" for i in range(0, len(data), n): yield data[i : i + n] @@ -37,6 +38,21 @@ class Status(str, Enum): class AsyncJob(ABC): """Abstract AsyncJob base class""" + def __init__(self, api: FacebookAdsApi, interval: pendulum.Period): + """ Init generic async job + + :param api: FB API instance (to create batch, etc) + :param interval: interval for which the job will fetch data + """ + self._api = api + self._interval = interval + self._attempt_number = 1 + + @property + def key(self) -> str: + """Job identifier, in most cases start of the interval""" + return str(self._interval.start.date()) + @abstractmethod def start(self): """Start remote job""" @@ -46,9 +62,9 @@ def restart(self): """Restart failed job""" @property - @abstractmethod - def restart_number(self): - """Number of restarts""" + def attempt_number(self): + """Number of attempts""" + return self._attempt_number @property @abstractmethod @@ -61,21 +77,28 @@ def failed(self) -> bool: """Tell if the job previously failed""" @abstractmethod - def update_job(self, batch=None): - """Method to retrieve job's status, separated because of retry handler""" + def update_job(self, batch: Optional[FacebookAdsApiBatch] = None): + """ Method to retrieve job's status, separated because of retry handler + + :param batch: FB batch executor + """ @abstractmethod - def get_result(self) -> Any: + def get_result(self) -> Iterator[Any]: """Retrieve result of the finished job.""" + @abstractmethod + def split_job(self) -> "AsyncJob": + """Split existing job in few smaller ones grouped by ParentAsyncJob""" + class ParentAsyncJob(AsyncJob): """Group of async jobs""" - def __init__(self, api, jobs: List[AsyncJob]): - self._api = api + def __init__(self, jobs: List[AsyncJob], **kwargs): + """Initialize jobs""" + super().__init__(**kwargs) self._jobs = jobs - self._restart_number = 0 def start(self): """Start each job in the group.""" @@ -87,12 +110,7 @@ def restart(self): for job in self._jobs: if job.failed: job.restart() - self._restart_number = max(self._restart_number, job.restart_number) - - @property - def restart_number(self): - """Number of restarts""" - return self._restart_number + self._attempt_number = max(self._attempt_number, job.attempt_number) @property def completed(self) -> bool: @@ -104,7 +122,7 @@ def failed(self) -> bool: """Tell if any job previously failed""" return any(job.failed for job in self._jobs) - def update_job(self, batch=None): + def update_job(self, batch: Optional[FacebookAdsApiBatch] = None): """Checks jobs status in advance and restart if some failed.""" batch = self._api.new_batch() unfinished_jobs = [job for job in self._jobs if not job.completed] @@ -117,51 +135,62 @@ def update_job(self, batch=None): # FacebookAdsApiBatch object with those calls batch = batch.execute() - def get_result(self) -> Any: + def get_result(self) -> Iterator[Any]: """Retrieve result of the finished job.""" for job in self._jobs: yield from job.get_result() def split_job(self) -> "AsyncJob": - """Split existing job in few smaller ones grouped by ParentAsyncJob class""" + """Split existing job in few smaller ones grouped by ParentAsyncJob class. Will be implemented in future versions.""" raise RuntimeError("Splitting of ParentAsyncJob is not allowed.") class InsightAsyncJob(AsyncJob): """AsyncJob wraps FB AdReport class and provides interface to restart/retry the async job""" - def __init__(self, api, edge_object: Any, params: Mapping[str, Any], key: Optional[Any] = None): + page_size = 100 + + def __init__(self, edge_object: Union[AdAccount, Campaign], params: Mapping[str, Any], **kwargs): """Initialize :param api: FB API :param edge_object: Account, Campaign, (AdSet or Ad in future) :param params: job params, required to start/restart job """ - self._api = api - self._params = params + super().__init__(**kwargs) + self._params = dict(params) + self._params["time_range"] = { + "since": self._interval.start.to_date_string(), + "until": self._interval.end.to_date_string(), + } + self._edge_object = edge_object self._job: Optional[AdReportRun] = None self._start_time = None self._finish_time = None self._failed = False - self._restart_number = 0 - self.key = key - def split_job(self) -> ParentAsyncJob: + def split_job(self) -> "AsyncJob": """Split existing job in few smaller ones grouped by ParentAsyncJob class. TODO: use some cache to avoid expensive queries across different streams. """ campaign_params = dict(copy.deepcopy(self._params)) # get campaigns from attribution window as well (28 day + 1 current day) - new_start = pendulum.parse(self._params["time_range"]["since"]) - pendulum.duration(days=28 + 1) + new_start = self._interval.start.date() - pendulum.duration(days=28 + 1) campaign_params.update(fields=["campaign_id"], level="campaign") campaign_params["time_range"].update(since=new_start.to_date_string()) campaign_params.pop("time_increment") # query all days result = self._edge_object.get_insights(params=campaign_params) campaign_ids = set(row["campaign_id"] for row in result) - logger.info(f"Got {len(campaign_ids)} campaigns for period {self._params['time_range']}: {campaign_ids}") + logger.info( + "Got %(num)s campaigns for period %(period)s: %(campaign_ids)s", + num=len(campaign_ids), + period=self._params['time_range'], + campaign_ids=campaign_ids + ) - return ParentAsyncJob(self._api, jobs=[InsightAsyncJob(self._api, Campaign(pk), self._params) for pk in campaign_ids]) + jobs = [InsightAsyncJob(api=self._api, edge_object=Campaign(pk), params=self._params) for pk in campaign_ids] + return ParentAsyncJob(api=self._api, interval=self._interval, jobs=jobs) def start(self): """Start remote job""" @@ -170,10 +199,13 @@ def start(self): self._job = self._edge_object.get_insights(params=self._params, is_async=True) self._start_time = pendulum.now() - job_id = self._job["report_run_id"] - time_range = self._params["time_range"] - breakdowns = self._params["breakdowns"] - logger.info(f"Created AdReportRun: {job_id} to sync insights {time_range} with breakdown {breakdowns} for {self._edge_object}") + logger.info( + "Created AdReportRun: %(job_id)s to sync insights %(time_range)s with breakdown %(breakdowns)s for %(obj)s", + job_id = self._job["report_run_id"], + time_range=self._params["time_range"], + breakdowns=self._params["breakdowns"], + obj=self._edge_object, + ) def restart(self): """Restart failed job""" @@ -184,14 +216,9 @@ def restart(self): self._failed = False self._start_time = None self._finish_time = None - self._restart_number += 1 + self._attempt_number += 1 self.start() - logger.info(f"{self}: restarted") - - @property - def restart_number(self): - """Number of restarts""" - return self._restart_number + logger.info("%s: restarted", self) @property def elapsed_time(self) -> Optional[pendulum.duration]: @@ -218,13 +245,12 @@ def failed(self) -> bool: def _batch_success_handler(self, response: FacebookResponse): """Update job status from response""" - print("GOT", response.json()) self._job = ObjectParser(reuse_object=self._job).parse_single(response.json()) self._check_status() def _batch_failure_handler(self, response: FacebookResponse): """Update job status from response""" - logger.info(f"Request failed with response: {response.body()}") + logger.info("Request failed with response: %s", response.body()) def update_job(self, batch: Optional[FacebookAdsApiBatch] = None): """Method to retrieve job's status, separated because of retry handler""" @@ -232,8 +258,10 @@ def update_job(self, batch: Optional[FacebookAdsApiBatch] = None): raise RuntimeError(f"{self}: Incorrect usage of the method - the job is not started") if self.completed: - job_progress_pct = self._job["async_percent_completion"] - logger.info(f"{self} is {job_progress_pct}% complete ({self._job['async_status']})") + logger.info( + "%(job)s is %(percent)s complete (%(status)s)", + job=self, percent=self._job["async_percent_completion"], status=self._job['async_status'] + ) # No need to update job status if its already completed return @@ -248,9 +276,11 @@ def _check_status(self) -> bool: :return: True if the job is completed, False - if the job is still running """ - job_progress_pct = self._job["async_percent_completion"] job_status = self._job["async_status"] - logger.info(f"{self} is {job_progress_pct}% complete ({job_status})") + logger.info( + "%(job)s is %(percent)s complete (%(status)s)", + job=self, percent=self._job["async_percent_completion"], status=job_status, + ) if job_status == Status.COMPLETED: self._finish_time = pendulum.now() # TODO: is not actual running time, but interval between check_status calls @@ -258,7 +288,10 @@ def _check_status(self) -> bool: elif job_status in [Status.FAILED, Status.SKIPPED]: self._finish_time = pendulum.now() self._failed = True - logger.info(f"{self._job} has status {job_status} after {self.elapsed_time.in_seconds()} seconds.") + logger.info( + "%(job)s has status %(status)s after %(elapsed)s seconds.", + job=self, status=job_status, elapsed=self.elapsed_time.in_seconds(), + ) return True return False @@ -266,12 +299,12 @@ def _check_status(self) -> bool: def get_result(self) -> Any: """Retrieve result of the finished job.""" if not self._job or self.failed: - raise RuntimeError(f"{self}: Incorrect usage of get_result - the job is not started of failed") - return self._job.get_result(params={"limit": 100}) + raise RuntimeError(f"{self}: Incorrect usage of get_result - the job is not started or failed") + return self._job.get_result(params={"limit": self.page_size}) def __str__(self) -> str: """String representation of the job wrapper.""" job_id = self._job["report_run_id"] if self._job else "" time_range = self._params["time_range"] breakdowns = self._params["breakdowns"] - return f"AdReportRun(id={job_id}, {self._edge_object}, time_range={time_range}, breakdowns={breakdowns}" + return f"InsightAsyncJob(id={job_id}, {self._edge_object}, time_range={time_range}, breakdowns={breakdowns}" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py index 60e05378fadba4..c74753a3c92963 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py @@ -27,7 +27,7 @@ class InsightAsyncJobManager: # When current insights throttle hit this value no new jobs added. THROTTLE_LIMIT = 70 - FAILED_JOBS_RESTART_COUNT = 5 + MAX_NUMBER_OF_ATTEMPTS = 5 # Time to wait before checking job status update again. JOB_STATUS_UPDATE_SLEEP_SECONDS = 30 # Maximum of concurrent jobs that could be scheduled. Since throttling @@ -113,15 +113,15 @@ def _check_jobs_status_and_restart(self) -> List[AsyncJob]: self._wait_throttle_limit_down() for job in self._running_jobs: if job.failed: - if job.restart_number >= self.FAILED_JOBS_RESTART_COUNT: - raise JobException(f"Job {job} failed more than {self.FAILED_JOBS_RESTART_COUNT} times. Terminating...") - elif job.restart_number: - logger.info(f"Job {job} failed, trying to split job into smaller chunks (campaigns).") + if job.attempt_number >= self.MAX_NUMBER_OF_ATTEMPTS: + raise JobException("%s: failed more than {self.MAX_NUMBER_OF_ATTEMPTS} times. Terminating...", job) + elif job.attempt_number == 2: + logger.info("%s: failed second time, trying to split job into smaller chunks (campaigns).", job) group_job = job.split_job() running_jobs.append(group_job) group_job.start() else: - logger.info(f"Job {job} failed, restarting") + logger.info("%s: failed, restarting", job) job.restart() running_jobs.append(job) failed_num += 1 diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py index 346411af6957a4..8eb12d7343a59f 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py @@ -129,7 +129,7 @@ def state(self) -> MutableMapping[str, Any]: return {} @state.setter - def state(self, value: MutableMapping[str, Any]): + def state(self, value: Mapping[str, Any]): """State setter""" self._cursor_value = pendulum.parse(value[self.cursor_field]).date() if value.get(self.cursor_field) else None self._completed_slices = set(pendulum.parse(v).date() for v in value.get("slices", [])) @@ -177,7 +177,7 @@ def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]: yield InsightAsyncJob(self._api.api, edge_object=self._api.account, params=total_params, key=ts_start) def stream_slices( - self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]: """Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time.