From ae503c32a8e653bfbf6e90842090bd2ea60a869e Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Sun, 23 Jan 2022 15:25:01 +0000 Subject: [PATCH] format --- .../integration_tests/spec.json | 32 ++++----------- .../source_facebook_marketing/source.py | 16 +++++--- .../streams/async_job.py | 24 ++++++----- .../streams/async_job_manager.py | 16 ++++---- .../streams/insights_streams.py | 41 +++++++++++-------- .../streams/streams.py | 17 ++++++-- 6 files changed, 76 insertions(+), 70 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json index 19b0f6cddb9d86..5debd07b2087fd 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json @@ -10,9 +10,7 @@ "description": "The date from which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.", "order": 0, "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", - "examples": [ - "2017-01-25T00:00:00Z" - ], + "examples": ["2017-01-25T00:00:00Z"], "type": "string", "format": "date-time" }, @@ -21,9 +19,7 @@ "description": "The date until which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated between start_date and this date will be replicated. Not setting this option will result in always syncing the latest data.", "order": 1, "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", - "examples": [ - "2017-01-26T00:00:00Z" - ], + "examples": ["2017-01-26T00:00:00Z"], "type": "string", "format": "date-time" }, @@ -31,9 +27,7 @@ "title": "Account ID", "description": "The Facebook Ad account ID to use when pulling data from the Facebook Marketing API.", "order": 2, - "examples": [ - "111111111111111" - ], + "examples": ["111111111111111"], "type": "string" }, "access_token": { @@ -99,32 +93,20 @@ } } }, - "required": [ - "name" - ] + "required": ["name"] } } }, - "required": [ - "start_date", - "account_id", - "access_token" - ] + "required": ["start_date", "account_id", "access_token"] }, "supportsIncremental": true, - "supported_destination_sync_modes": [ - "append" - ], + "supported_destination_sync_modes": ["append"], "authSpecification": { "auth_type": "oauth2.0", "oauth2Specification": { "rootObject": [], "oauthFlowInitParameters": [], - "oauthFlowOutputParameters": [ - [ - "access_token" - ] - ] + "oauthFlowOutputParameters": [["access_token"]] } } } diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index 9a5464ea10ebac..005c4598f3911c 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -4,23 +4,29 @@ import logging from datetime import datetime -from typing import Any, List, Mapping, Optional, Tuple, Type, Iterator, MutableMapping +from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Type import pendulum from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ( AirbyteConnectionStatus, + AirbyteMessage, AuthSpecification, + ConfiguredAirbyteStream, ConnectorSpecification, DestinationSyncMode, OAuth2Specification, - Status, AirbyteMessage, ConfiguredAirbyteStream, SyncMode, + Status, + SyncMode, ) from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.config import BaseConfig from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.core import package_name_from_class -from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, InternalConfig +from airbyte_cdk.sources.utils.schema_helpers import ( + InternalConfig, + ResourceSchemaLoader, +) from pydantic import BaseModel, Field from source_facebook_marketing.api import API from source_facebook_marketing.streams import ( @@ -104,7 +110,7 @@ class Config: custom_insights: Optional[List[InsightConfig]] = Field( title="Custom Insights", order=6, - description="A list which contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)" + description="A list which contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)", ) @@ -255,7 +261,7 @@ def _read_incremental( connector_state: MutableMapping[str, Any], internal_config: InternalConfig, ) -> Iterator[AirbyteMessage]: - """ We override this method because we need to inject new state handling. + """We override this method because we need to inject new state handling. Old way: pass stream_state in read_records and other methods call stream_state = stream_instance.get_updated_state(stream_state, record_data) for each record diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py index 161e56542819dd..dbe6531c717a2a 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py @@ -5,14 +5,13 @@ import logging from abc import ABC, abstractmethod from enum import Enum -from typing import Any, Mapping, Optional, List +from typing import Any, List, Mapping, Optional import pendulum from facebook_business.adobjects.adreportrun import AdReportRun from facebook_business.adobjects.campaign import Campaign from facebook_business.adobjects.objectparser import ObjectParser -from facebook_business.api import FacebookResponse, FacebookAdsApiBatch - +from facebook_business.api import FacebookAdsApiBatch, FacebookResponse logger = logging.getLogger("airbyte") @@ -20,7 +19,7 @@ def chunks(data, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(data), n): - yield data[i: i + n] + yield data[i : i + n] class Status(str, Enum): @@ -61,7 +60,7 @@ def failed(self) -> bool: """Tell if the job previously failed""" @abstractmethod - def update_job(self, batch = None): + def update_job(self, batch=None): """Method to retrieve job's status, separated because of retry handler""" @abstractmethod @@ -70,8 +69,8 @@ def get_result(self) -> Any: class ParentAsyncJob(AsyncJob): - """ Group of async jobs - """ + """Group of async jobs""" + def __init__(self, api, jobs: List[AsyncJob]): self._api = api self._jobs = jobs @@ -129,7 +128,7 @@ def get_result(self) -> Any: for job in self._jobs: yield from job.get_result() - def split_job(self) -> 'AsyncJob': + def split_job(self) -> "AsyncJob": """Split existing job in few smaller ones grouped by ParentAsyncJob class""" raise RuntimeError("Splitting of ParentAsyncJob is not allowed.") @@ -156,7 +155,7 @@ def __init__(self, api, edge_object: Any, params: Mapping[str, Any], key: Option def split_job(self) -> ParentAsyncJob: """Split existing job in few smaller ones grouped by ParentAsyncJob class. - TODO: use some cache to avoid expensive queries across different streams. + TODO: use some cache to avoid expensive queries across different streams. """ campaign_params = dict(copy.deepcopy(self._params)) # get campaigns from attribution window as well (28 day + 1 current day) @@ -177,8 +176,11 @@ def start(self, batch=None): if batch is not None: self._edge_object.get_insights( - params=self._params, is_async=True, batch=batch, - success=self._batch_success_handler, failure=self._batch_failure_handler, + params=self._params, + is_async=True, + batch=batch, + success=self._batch_success_handler, + failure=self._batch_failure_handler, ) else: self._job = self._edge_object.get_insights(params=self._params, is_async=True) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py index e6f783f97d56e7..8c8ae23f13527c 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py @@ -4,9 +4,10 @@ import logging import time -from typing import Tuple, Iterator, List, TYPE_CHECKING +from typing import TYPE_CHECKING, Iterator, List, Tuple from facebook_business.api import FacebookAdsApiBatch + from .async_job import AsyncJob if TYPE_CHECKING: @@ -34,8 +35,8 @@ class InsightAsyncJobManager: MAX_JOBS_IN_QUEUE = 100 MAX_JOBS_TO_CHECK = 50 - def __init__(self, api: 'API', jobs: Iterator[AsyncJob]): - """ Init + def __init__(self, api: "API", jobs: Iterator[AsyncJob]): + """Init :param api: :param jobs: @@ -50,10 +51,7 @@ def _start_jobs(self): self._update_api_throttle_limit() self._wait_throttle_limit_down() prev_jobs_count = len(self._running_jobs) - while ( - self._get_current_throttle_value() < self.THROTTLE_LIMIT - and len(self._running_jobs) < self.MAX_JOBS_IN_QUEUE - ): + while self._get_current_throttle_value() < self.THROTTLE_LIMIT and len(self._running_jobs) < self.MAX_JOBS_IN_QUEUE: job = next(iter(self._jobs), None) if not job: self._empty = True @@ -68,7 +66,7 @@ def _start_jobs(self): ) def completed_jobs(self) -> Iterator[AsyncJob]: - """ Wait until job is ready and return it. If job + """Wait until job is ready and return it. If job failed try to restart it for FAILED_JOBS_RESTART_COUNT times. After job is completed new jobs added according to current throttling limit. @@ -87,7 +85,7 @@ def completed_jobs(self) -> Iterator[AsyncJob]: self._start_jobs() def _check_jobs_status_and_restart(self) -> List[AsyncJob]: - """ Checks jobs status in advance and restart if some failed. + """Checks jobs status in advance and restart if some failed. :return: list of completed jobs """ diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/insights_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/insights_streams.py index a1a8809e156576..0f5e013a870005 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/insights_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/insights_streams.py @@ -4,7 +4,16 @@ import copy import logging -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Iterator, Union +from typing import ( + Any, + Iterable, + Iterator, + List, + Mapping, + MutableMapping, + Optional, + Union, +) import airbyte_cdk.sources.utils.casing as casing import pendulum @@ -12,7 +21,7 @@ from airbyte_cdk.sources.streams.core import package_name_from_class from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader from cached_property import cached_property -from source_facebook_marketing.streams.async_job import InsightAsyncJob, AsyncJob +from source_facebook_marketing.streams.async_job import AsyncJob, InsightAsyncJob from source_facebook_marketing.streams.async_job_manager import InsightAsyncJobManager from .streams import FBMarketingIncrementalStream @@ -57,12 +66,12 @@ class AdsInsights(FBMarketingIncrementalStream): breakdowns = [] def __init__( - self, - name: str = None, - fields: List[str] = None, - breakdowns: List[str] = None, - action_breakdowns: List[str] = None, - **kwargs, + self, + name: str = None, + fields: List[str] = None, + breakdowns: List[str] = None, + action_breakdowns: List[str] = None, + **kwargs, ): super().__init__(**kwargs) self._start_date = self._start_date.date() @@ -79,7 +88,7 @@ def __init__( @property def name(self) -> str: - """ We override stream name to let the user change it via configuration.""" + """We override stream name to let the user change it via configuration.""" name = self._new_class_name or self.__class__.__name__ return casing.camel_to_snake(name) @@ -95,11 +104,11 @@ def _get_campaign_ids(self, params) -> List[str]: return list(set(row["campaign_id"] for row in result)) def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: """Waits for current job to finish (slice) and yield its result""" job = stream_slice["insight_job"] @@ -147,7 +156,7 @@ def _advance_cursor(self): self._cursor_value = ts_start def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]: - """ Generator of async jobs + """Generator of async jobs :param params: :return: @@ -184,7 +193,7 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: yield {"insight_job": job} def _get_start_date(self) -> pendulum.Date: - """ Get start date to begin sync with. It is not that trivial as it might seem. + """Get start date to begin sync with. It is not that trivial as it might seem. There are few rules: - don't read data older than start_date - re-read data within last 28 days diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py index a4d7ee4994a312..dda511351cfc3d 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py @@ -6,7 +6,16 @@ import logging from abc import ABC from datetime import datetime -from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, TYPE_CHECKING, Optional +from typing import ( + TYPE_CHECKING, + Any, + Iterable, + Iterator, + List, + Mapping, + MutableMapping, + Optional, +) import pendulum import requests @@ -16,6 +25,7 @@ from cached_property import cached_property from facebook_business.adobjects.abstractobject import AbstractObject from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse + from .common import deep_merge if TYPE_CHECKING: @@ -53,7 +63,7 @@ class FBMarketingStream(Stream, ABC): MAX_BATCH_SIZE = 50 - def __init__(self, api: 'API', include_deleted: bool = False, **kwargs): + def __init__(self, api: "API", include_deleted: bool = False, **kwargs): super().__init__(**kwargs) self._api = api self._include_deleted = include_deleted if self.enable_deleted else False @@ -216,8 +226,7 @@ def __init__(self, fetch_thumbnail_images: bool = False, **kwargs): @cached_property def fields(self) -> List[str]: - """ Remove "thumbnail_data_url" field because it is computed field and it's not a field that we can request from Facebook - """ + """Remove "thumbnail_data_url" field because it is computed field and it's not a field that we can request from Facebook""" return [f for f in super().fields if f != "thumbnail_data_url"] def read_records(