Skip to content

Commit

Permalink
🎉 Connector Facebook-Marketing: update insights streams with custom e…
Browse files Browse the repository at this point in the history
…ntries for fields, breakdowns and action_breakdowns (#7158)

* Connector Facebook-Marketing: update streams with custom streams

* update: remove custom streams, add new custom insights from config

* update: add new model for InsightConfig, remove old imports

* fix: format to source file and streams file

* update Changelog

* update: add to check a validation to insights entries, update documentation and fix to resolve  in spec schema

* fix: fix import logger from entrypoint, change to python logger

* fix: change error message from check custom insights entries, fix logger import

Co-authored-by: vladimir <vladimir.remar@gmail.com>
  • Loading branch information
yevhenii-ldv and vladimir-remar committed Oct 19, 2021
1 parent 214b158 commit b18acd6
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 12 deletions.
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c",
"name": "Facebook Marketing",
"dockerRepository": "airbyte/source-facebook-marketing",
"dockerImageTag": "0.2.20",
"dockerImageTag": "0.2.21",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/facebook-marketing",
"icon": "facebook.svg"
}
Expand Up @@ -147,7 +147,7 @@
- sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
name: Facebook Marketing
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.20
dockerImageTag: 0.2.21
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.20
LABEL io.airbyte.version=0.2.21
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Expand Up @@ -53,9 +53,94 @@
"minimum": 1,
"maximum": 30,
"type": "integer"
},
"custom_insights": {
"title": "Custom Insights",
"description": "A list wich contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)",
"type": "array",
"items": {
"title": "InsightConfig",
"type": "object",
"properties": {
"name": {
"title": "Name",
"description": "The name value of insight",
"type": "string"
},
"fields": {
"title": "Fields",
"description": "A list of chosen fields for fields parameter",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"breakdowns": {
"title": "Breakdowns",
"description": "A list of chosen breakdowns for breakdowns",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"action_breakdowns": {
"title": "Action Breakdowns",
"description": "A list of chosen action_breakdowns for action_breakdowns",
"default": [],
"type": "array",
"items": {
"type": "string"
}
}
},
"required": ["name"]
}
}
},
"required": ["account_id", "access_token", "start_date"]
"required": ["account_id", "access_token", "start_date"],
"definitions": {
"InsightConfig": {
"title": "InsightConfig",
"type": "object",
"properties": {
"name": {
"title": "Name",
"description": "The name value of insight",
"type": "string"
},
"fields": {
"title": "Fields",
"description": "A list of chosen fields for fields parameter",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"breakdowns": {
"title": "Breakdowns",
"description": "A list of chosen breakdowns for breakdowns",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"action_breakdowns": {
"title": "Action Breakdowns",
"description": "A list of chosen action_breakdowns for action_breakdowns",
"default": [],
"type": "array",
"items": {
"type": "string"
}
}
},
"required": ["name"]
}
}
},
"supportsIncremental": true,
"supported_destination_sync_modes": ["append"],
Expand Down
Expand Up @@ -6,14 +6,16 @@
from time import sleep

import pendulum
from airbyte_cdk.entrypoint import logger
import logging
from cached_property import cached_property
from facebook_business import FacebookAdsApi
from facebook_business.adobjects import user as fb_user
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.exceptions import FacebookRequestError
from source_facebook_marketing.common import FacebookAPIException

logger = logging.getLogger(__name__)


class MyFacebookAdsApi(FacebookAdsApi):
"""Custom Facebook API class to intercept all API calls and handle call rate limits"""
Expand Down
Expand Up @@ -7,7 +7,7 @@

import backoff
import pendulum
from airbyte_cdk.entrypoint import logger # FIXME (Eugene K): register logger as standard python logger
import logging
from facebook_business.exceptions import FacebookRequestError

# The Facebook API error codes indicating rate-limiting are listed at
Expand All @@ -16,6 +16,8 @@
FACEBOOK_UNKNOWN_ERROR_CODE = 99
DEFAULT_SLEEP_INTERVAL = pendulum.duration(minutes=1)

logger = logging.getLogger(__name__)


class FacebookAPIException(Exception):
"""General class for all API errors"""
Expand Down
@@ -0,0 +1,38 @@
{
"properties": {
"action_device": {"type": ["null", "string"]},
"action_canvas_component_name": {"type": ["null", "string"]},
"action_carousel_card_id": {"type": ["null", "string"]},
"action_carousel_card_name": {"type": ["null", "string"]},
"action_destination": {"type": ["null", "string"]},
"action_reaction": {"type": ["null", "string"]},
"action_target_id": {"type": ["null", "string"]},
"action_type": {"type": ["null", "string"]},
"action_video_sound": {"type": ["null", "string"]},
"action_video_type": {"type": ["null", "string"]},
"ad_format_asset": {"type": ["null", "string"]},
"age": {"type": ["null", "string"]},
"app_id": {"type": ["null", "string"]},
"body_asset": {"type": ["null", "string"]},
"call_to_action_asset": {"type": ["null", "string"]},
"country": {"type": ["null", "string"]},
"description_asset": {"type": ["null", "string"]},
"device_platform": {"type": ["null", "string"]},
"dma": {"type": ["null", "string"]},
"frequency_value": {"type": ["null", "string"]},
"gender": {"type": ["null", "string"]},
"hourly_stats_aggregated_by_advertiser_time_zone": {"type": ["null", "string"]},
"hourly_stats_aggregated_by_audience_time_zone": {"type": ["null", "string"]},
"image_asset": {"type": ["null", "string"]},
"impression_device": {"type": ["null", "string"]},
"link_url_asset": {"type": ["null", "string"]},
"place_page_id": {"type": ["null", "string"]},
"platform_position": {"type": ["null", "string"]},
"product_id": {"type": ["null", "string"]},
"publisher_platform": {"type": ["null", "string"]},
"region": {"type": ["null", "string"]},
"skan_conversion_id": {"type": ["null", "string"]},
"title_asset": {"type": ["null", "string"]},
"video_asset": {"type": ["null", "string"]}
}
}
Expand Up @@ -2,13 +2,22 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
from datetime import datetime
from typing import Any, List, Mapping, Optional, Tuple, Type, MutableMapping
from jsonschema import RefResolver


from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteConnectionStatus, AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification, Status

from typing import Any, List, Mapping, Optional, Tuple, Type

import pendulum
from airbyte_cdk.models import AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import package_name_from_class
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from pydantic import BaseModel, Field
from source_facebook_marketing.api import API
from source_facebook_marketing.streams import (
Expand All @@ -26,6 +35,19 @@
)


logger = logging.getLogger(__name__)

class InsightConfig(BaseModel):

name: str = Field(description="The name value of insight")

fields: Optional[List[str]] = Field(description="A list of chosen fields for fields parameter", default=[])

breakdowns: Optional[List[str]] = Field(description="A list of chosen breakdowns for breakdowns", default=[])

action_breakdowns: Optional[List[str]] = Field(description="A list of chosen action_breakdowns for action_breakdowns", default=[])


class ConnectorConfig(BaseModel):
class Config:
title = "Source Facebook Marketing"
Expand Down Expand Up @@ -65,6 +87,9 @@ class Config:
minimum=1,
maximum=30,
)
custom_insights: Optional[List[InsightConfig]] = Field(
description="A list wich contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)"
)


class SourceFacebookMarketing(AbstractSource):
Expand Down Expand Up @@ -104,10 +129,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
days_per_job=config.insights_days_per_job,
)

return [
streams = [
Campaigns(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
AdSets(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
Ads(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),

AdCreatives(api=api),
AdsInsights(**insights_args),
AdsInsightsAgeAndGender(**insights_args),
Expand All @@ -118,6 +144,22 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
AdsInsightsActionType(**insights_args),
]

return self._update_insights_streams(insights=config.custom_insights, args=insights_args, streams=streams)

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
try:
check_succeeded, error = self.check_connection(logger, config)
if not check_succeeded:
return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=repr(e))

self._check_custom_insights_entries(config.get('custom_insights', []))

return AirbyteConnectionStatus(status=Status.SUCCEEDED)


def spec(self, *args, **kwargs) -> ConnectorSpecification:
"""
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
Expand All @@ -128,11 +170,80 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification:
changelogUrl="https://docs.airbyte.io/integrations/sources/facebook-marketing",
supportsIncremental=True,
supported_destination_sync_modes=[DestinationSyncMode.append],
connectionSpecification=ConnectorConfig.schema(),
connectionSpecification=expand_local_ref(ConnectorConfig.schema()),
authSpecification=AuthSpecification(
auth_type="oauth2.0",
oauth2Specification=OAuth2Specification(
rootObject=[], oauthFlowInitParameters=[], oauthFlowOutputParameters=[["access_token"]]
),
)
),
)

def _update_insights_streams(self, insights, args, streams) -> List[Type[Stream]]:
"""Update method, if insights have values returns streams replacing the
default insights streams else returns streams
"""
if not insights:
return streams

insights_custom_streams = list()

for insight in insights:
args["name"] = f"Custom{insight.name}"
args["fields"] = list(set(insight.fields))
args["breakdowns"] = list(set(insight.breakdowns))
args["action_breakdowns"] = list(set(insight.action_breakdowns))
insight_stream = AdsInsights(**args)
insights_custom_streams.append(insight_stream)

return streams + insights_custom_streams

def _check_custom_insights_entries(self, insights: List[Mapping[str, Any]]):

default_fields = list(ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights").get("properties", {}).keys())
default_breakdowns = list(ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights_breakdowns").get("properties", {}).keys())
default_actions_breakdowns = [e for e in default_breakdowns if 'action_' in e]

for insight in insights:
if insight.get('fields'):
value_checked, value = self._check_values(default_fields, insight.get('fields'))
if not value_checked:
message = f"{value} is not a valid field name"
raise Exception("Config validation error: " + message) from None
if insight.get('breakdowns'):
value_checked, value = self._check_values(default_breakdowns, insight.get('breakdowns'))
if not value_checked:
message = f"{value} is not a valid breakdown name"
raise Exception("Config validation error: " + message) from None
if insight.get('action_breakdowns'):
value_checked, value = self._check_values(default_actions_breakdowns, insight.get('action_breakdowns'))
if not value_checked:
message = f"{value} is not a valid action_breakdown name"
raise Exception("Config validation error: " + message) from None

return True

def _check_values(self, default_value: List[str], custom_value: List[str]) -> Tuple[bool, Any]:
for e in custom_value:
if e not in default_value:
logger.error(f"{e} does not appear in {default_value}")
return False, e

return True, None


def expand_local_ref(schema, resolver=None, **kwargs):
resolver = resolver or RefResolver("", schema)
if isinstance(schema, MutableMapping):
if "$ref" in schema:
ref_url = schema.pop("$ref")
url, resolved_schema = resolver.resolve(ref_url)
schema.update(resolved_schema)
for key, value in schema.items():
schema[key] = expand_local_ref(value, resolver=resolver)
return schema
elif isinstance(schema, List):
return [expand_local_ref(item, resolver=resolver) for item in schema]

return schema

0 comments on commit b18acd6

Please sign in to comment.