Skip to content

Commit

Permalink
Source Facebook Marketing: Add lookback window to insights streams (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimir-remar committed May 31, 2022
1 parent c55f185 commit 76032e6
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@
- name: Facebook Marketing
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.49
dockerImageTag: 0.2.50
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
19 changes: 18 additions & 1 deletion airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1838,7 +1838,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-facebook-marketing:0.2.49"
- dockerImage: "airbyte/source-facebook-marketing:0.2.50"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
Expand Down Expand Up @@ -2141,6 +2141,14 @@
- "2017-01-26T00:00:00Z"
type: "string"
format: "date-time"
insights_lookback_window:
title: "Custom Insights Lookback Window"
description: "The attribution window"
default: 28
maximum: 28
mininum: 1
exclusiveMinimum: 0
type: "integer"
required:
- "name"
page_size:
Expand All @@ -2153,6 +2161,15 @@
order: 7
exclusiveMinimum: 0
type: "integer"
insights_lookback_window:
title: "Insights Lookback Window"
description: "The attribution window"
default: 28
order: 8
maximum: 28
mininum: 1
exclusiveMinimum: 0
type: "integer"
required:
- "account_id"
- "start_date"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.2.49
LABEL io.airbyte.version=0.2.50
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@
"examples": ["2017-01-26T00:00:00Z"],
"type": "string",
"format": "date-time"
},
"insights_lookback_window": {
"title": "Custom Insights Lookback Window",
"description": "The attribution window",
"default": 28,
"maximum": 28,
"mininum": 1,
"exclusiveMinimum": 0,
"type": "integer"
}
},
"required": ["name"]
Expand All @@ -301,6 +310,16 @@
"order": 7,
"exclusiveMinimum": 0,
"type": "integer"
},
"insights_lookback_window": {
"title": "Insights Lookback Window",
"description": "The attribution window",
"default": 28,
"order": 8,
"maximum": 28,
"mininum": 1,
"exclusiveMinimum": 0,
"type": "integer"
}
},
"required": ["account_id", "start_date", "access_token"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
api = API(account_id=config.account_id, access_token=config.access_token)

insights_args = dict(
api=api,
start_date=config.start_date,
end_date=config.end_date,
api=api, start_date=config.start_date, end_date=config.end_date, insights_lookback_window=config.insights_lookback_window
)
streams = [
AdAccount(api=api),
Expand Down Expand Up @@ -159,6 +157,7 @@ def _update_insights_streams(self, insights: List[InsightConfig], default_args,
time_increment=insight.time_increment,
start_date=insight.start_date or default_args["start_date"],
end_date=insight.end_date or default_args["end_date"],
insights_lookback_window=insight.insights_lookback_window or default_args["insights_lookback_window"],
)
insight_stream = AdsInsights(**args)
insights_custom_streams.append(insight_stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ class Config:
pattern=DATE_TIME_PATTERN,
examples=["2017-01-26T00:00:00Z"],
)
insights_lookback_window: Optional[PositiveInt] = Field(
title="Custom Insights Lookback Window",
description="The attribution window",
maximum=28,
mininum=1,
default=28,
)


class ConnectorConfig(BaseConfig):
Expand Down Expand Up @@ -156,3 +163,12 @@ class Config:
"Page size used when sending requests to Facebook API to specify number of records per page when response has pagination. Most users do not need to set this field unless they specifically need to tune the connector to address specific issues or use cases."
),
)

insights_lookback_window: Optional[PositiveInt] = Field(
title="Insights Lookback Window",
order=8,
description="The attribution window",
maximum=28,
mininum=1,
default=28,
)
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ class AdsInsights(FBMarketingIncrementalStream):
# HTTP response.
# https://developers.facebook.com/docs/marketing-api/reference/ad-account/insights/#overview
INSIGHTS_RETENTION_PERIOD = pendulum.duration(months=37)
# Facebook freezes insight data 28 days after it was generated, which means that all data
# from the past 28 days may have changed since we last emitted it, so we retrieve it again.
INSIGHTS_LOOKBACK_PERIOD = pendulum.duration(days=28)

action_breakdowns = ALL_ACTION_BREAKDOWNS
level = "ad"
Expand All @@ -64,6 +61,7 @@ def __init__(
breakdowns: List[str] = None,
action_breakdowns: List[str] = None,
time_increment: Optional[int] = None,
insights_lookback_window: int = None,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -74,6 +72,7 @@ def __init__(
self.breakdowns = breakdowns or self.breakdowns
self.time_increment = time_increment or self.time_increment
self._new_class_name = name
self._insights_lookback_window = insights_lookback_window

# state
self._cursor_value: Optional[pendulum.Date] = None # latest period that was read
Expand All @@ -91,6 +90,16 @@ def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
"""Build complex PK based on slices and breakdowns"""
return ["date_start", "account_id", "ad_id"] + self.breakdowns

@property
def insights_lookback_period(self):
"""
Facebook freezes insight data 28 days after it was generated, which means that all data
from the past 28 days may have changed since we last emitted it, so we retrieve it again.
But in some cases users my have define their own lookback window, thats
why the value for `insights_lookback_window` is set throught config.
"""
return pendulum.duration(days=self._insights_lookback_window)

def list_objects(self, params: Mapping[str, Any]) -> Iterable:
"""Because insights has very different read_records we don't need this method anymore"""

Expand Down Expand Up @@ -187,7 +196,7 @@ def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]:
"""

today = pendulum.today(tz="UTC").date()
refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD
refresh_date = today - self.insights_lookback_period

for ts_start in self._date_intervals():
if ts_start in self._completed_slices:
Expand Down Expand Up @@ -235,13 +244,12 @@ def _get_start_date(self) -> pendulum.Date:
"""
today = pendulum.today(tz="UTC").date()
oldest_date = today - self.INSIGHTS_RETENTION_PERIOD
refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD

refresh_date = today - self.insights_lookback_period
if self._cursor_value:
start_date = self._cursor_value + pendulum.duration(days=self.time_increment)
if start_date > refresh_date:
logger.info(
f"The cursor value within refresh period ({self.INSIGHTS_LOOKBACK_PERIOD}), start sync from {refresh_date} instead."
f"The cursor value within refresh period ({self.insights_lookback_period}), start sync from {refresh_date} instead."
)
start_date = min(start_date, refresh_date)

Expand All @@ -252,7 +260,6 @@ def _get_start_date(self) -> pendulum.Date:
start_date = self._start_date
if start_date < oldest_date:
logger.warning(f"Loading insights older then {self.INSIGHTS_RETENTION_PERIOD} is not possible. Start sync from {oldest_date}.")

return max(oldest_date, start_date)

def request_params(self, **kwargs) -> MutableMapping[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def async_job_mock_fixture(mocker):

class TestBaseInsightsStream:
def test_init(self, api):
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1))
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), insights_lookback_window=28)

assert not stream.breakdowns
assert stream.action_breakdowns == AdsInsights.ALL_ACTION_BREAKDOWNS
Expand All @@ -66,6 +66,7 @@ def test_init_override(self, api):
name="CustomName",
breakdowns=["test1", "test2"],
action_breakdowns=["field1", "field2"],
insights_lookback_window=28,
)

assert stream.breakdowns == ["test1", "test2"]
Expand All @@ -85,6 +86,7 @@ def test_read_records_all(self, mocker, api):
api=api,
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
insights_lookback_window=28,
)

records = list(
Expand All @@ -104,11 +106,7 @@ def test_read_records_random_order(self, mocker, api):
job = mocker.Mock(spec=AsyncJob)
job.get_result.return_value = [mocker.Mock(), mocker.Mock(), mocker.Mock()]
job.interval = pendulum.Period(pendulum.date(2010, 1, 1), pendulum.date(2010, 1, 1))
stream = AdsInsights(
api=api,
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
)
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), insights_lookback_window=28)

records = list(
stream.read_records(
Expand Down Expand Up @@ -143,11 +141,7 @@ def test_read_records_random_order(self, mocker, api):
)
def test_state(self, api, state):
"""State setter/getter should work with all combinations"""
stream = AdsInsights(
api=api,
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
)
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), insights_lookback_window=28)

assert stream.state == {}

Expand All @@ -162,7 +156,7 @@ def test_state(self, api, state):
def test_stream_slices_no_state(self, api, async_manager_mock, start_date):
"""Stream will use start_date when there is not state"""
end_date = start_date + duration(weeks=2)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]

slices = list(stream.stream_slices(stream_state=None, sync_mode=SyncMode.incremental))
Expand All @@ -179,7 +173,7 @@ def test_stream_slices_no_state_close_to_now(self, api, async_manager_mock, rece
"""Stream will use start_date when there is not state and start_date within 28d from now"""
start_date = recent_start_date
end_date = pendulum.now()
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]

slices = list(stream.stream_slices(stream_state=None, sync_mode=SyncMode.incremental))
Expand All @@ -197,7 +191,7 @@ def test_stream_slices_with_state(self, api, async_manager_mock, start_date):
end_date = start_date + duration(days=10)
cursor_value = start_date + duration(days=5)
state = {AdsInsights.cursor_field: cursor_value.date().isoformat()}
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]

slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental))
Expand All @@ -216,7 +210,7 @@ def test_stream_slices_with_state_close_to_now(self, api, async_manager_mock, re
end_date = pendulum.now()
cursor_value = end_date - duration(days=1)
state = {AdsInsights.cursor_field: cursor_value.date().isoformat()}
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]

slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental))
Expand All @@ -237,7 +231,7 @@ def test_stream_slices_with_state_and_slices(self, api, async_manager_mock, star
AdsInsights.cursor_field: cursor_value.date().isoformat(),
"slices": [(cursor_value + duration(days=1)).date().isoformat(), (cursor_value + duration(days=3)).date().isoformat()],
}
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]

slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental))
Expand All @@ -251,7 +245,7 @@ def test_stream_slices_with_state_and_slices(self, api, async_manager_mock, star
assert generated_jobs[1].interval.start == cursor_value.date() + duration(days=4)

def test_get_json_schema(self, api):
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1))
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), insights_lookback_window=28)

schema = stream.get_json_schema()

Expand All @@ -261,7 +255,11 @@ def test_get_json_schema(self, api):

def test_get_json_schema_custom(self, api):
stream = AdsInsights(
api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), breakdowns=["device_platform", "country"]
api=api,
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
breakdowns=["device_platform", "country"],
insights_lookback_window=28,
)

schema = stream.get_json_schema()
Expand All @@ -275,6 +273,7 @@ def test_fields(self, api):
api=api,
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
insights_lookback_window=28,
)

fields = stream.fields
Expand All @@ -289,6 +288,7 @@ def test_fields_custom(self, api):
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
fields=["account_id", "account_currency"],
insights_lookback_window=28,
)

assert stream.fields == ["account_id", "account_currency"]
Expand All @@ -297,7 +297,7 @@ def test_completed_slices_in_lookback_period(self, api, monkeypatch, set_today):
start_date = pendulum.parse("2020-03-01")
end_date = pendulum.parse("2020-05-01")
set_today("2020-04-01")
monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=10))

monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)

Expand All @@ -311,7 +311,7 @@ def test_completed_slices_in_lookback_period(self, api, monkeypatch, set_today):
"time_increment": 1,
}

stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=10)
stream.state = state
assert stream._completed_slices == {pendulum.Date(2020, 3, 21), pendulum.Date(2020, 3, 22), pendulum.Date(2020, 3, 23)}

Expand All @@ -327,11 +327,11 @@ def test_incremental_lookback_period_updated(self, api, monkeypatch, set_today):
start_date = pendulum.parse("2020-03-01")
end_date = pendulum.parse("2020-05-01")
yesterday, _ = set_today("2020-04-01")
monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=20))

monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)

stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=20)

records = read_full_refresh(stream)
assert len(records) == (yesterday - start_date).days + 1
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ For more information, see the [Facebook Insights API documentation.](https://dev

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.2.50 | 2022-04-27 | [12402](https://github.com/airbytehq/airbyte/pull/12402) | Add lookback window to insights streams |
| 0.2.49 | 2022-05-20 | [13047](https://github.com/airbytehq/airbyte/pull/13047) | Fix duplicating records during insights lookback period |
| 0.2.48 | 2022-05-19 | [13008](https://github.com/airbytehq/airbyte/pull/13008) | Update CDK to v0.1.58 avoid crashing on incorrect stream schemas |
| 0.2.47 | 2022-05-06 | [12685](https://github.com/airbytehq/airbyte/pull/12685) | Update CDK to v0.1.56 to emit an `AirbyeTraceMessage` on uncaught exceptions |
Expand Down

0 comments on commit 76032e6

Please sign in to comment.