Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Facebook Marketing: Add lookback window to insights streams #12402

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d277560
WIP: add lookback window to insgiths streams
vladimir-remar Apr 27, 2022
03d5a72
update: docs in
vladimir-remar Apr 27, 2022
84efdab
update: add insights_lookback_window in custom insights
vladimir-remar May 10, 2022
c6d00d7
solve conflicts
vladimir-remar May 10, 2022
50a3a9d
Merge branch 'master' into source-facebook-marketing-add-insights-loo…
vladimir-remar May 19, 2022
d30b66d
update connector version in dockerfile and update facebook-marketing.md
vladimir-remar May 19, 2022
e12deff
fix conflicts
vladimir-remar May 20, 2022
71f326a
formatting using blackFormat
vladimir-remar May 20, 2022
b1481da
add minimun value to insights_lookback_window field and replace exclu…
vladimir-remar May 20, 2022
14b3007
fix unit tests: test_base_insight_streams
vladimir-remar May 20, 2022
439d527
fix order on specs
vladimir-remar May 23, 2022
a0d13f8
update integration spec file
vladimir-remar May 23, 2022
384682b
Merge branch 'master' into source-facebook-marketing-add-insights-loo…
alafanechere May 23, 2022
d549143
update test spec.json
alafanechere May 23, 2022
c86417f
update test spec.json
alafanechere May 23, 2022
3f67e25
fix conflicts
vladimir-remar May 24, 2022
fb66ecb
Merge branch 'master' into source-facebook-marketing-add-insights-loo…
vladimir-remar May 24, 2022
246c2dd
update refresh date
vladimir-remar May 24, 2022
b39262a
update test_base_insight_streams
vladimir-remar May 24, 2022
a6577ed
remove round brackets from insights_lookback_window description
vladimir-remar May 25, 2022
4524fc4
remove monkeypatch for AdsInsights in test_incremental_lookback_perio…
vladimir-remar May 25, 2022
fb99be7
Merge branch 'master' into source-facebook-marketing-add-insights-loo…
vladimir-remar May 30, 2022
fe1b30a
update connector version in Dockerfile
vladimir-remar May 30, 2022
b7d7973
fix typo in changelog
alafanechere May 30, 2022
944783d
Merge branch 'master' into source-facebook-marketing-add-insights-loo…
alafanechere May 31, 2022
7483176
auto-bump connector version
octavia-squidington-iii May 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@
- name: Facebook Marketing
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.49
dockerImageTag: 0.2.50
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
19 changes: 18 additions & 1 deletion airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1838,7 +1838,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-facebook-marketing:0.2.49"
- dockerImage: "airbyte/source-facebook-marketing:0.2.50"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
Expand Down Expand Up @@ -2141,6 +2141,14 @@
- "2017-01-26T00:00:00Z"
type: "string"
format: "date-time"
insights_lookback_window:
title: "Custom Insights Lookback Window"
description: "The attribution window"
default: 28
maximum: 28
mininum: 1
exclusiveMinimum: 0
type: "integer"
required:
- "name"
page_size:
Expand All @@ -2153,6 +2161,15 @@
order: 7
exclusiveMinimum: 0
type: "integer"
insights_lookback_window:
title: "Insights Lookback Window"
description: "The attribution window"
default: 28
order: 8
maximum: 28
mininum: 1
exclusiveMinimum: 0
type: "integer"
required:
- "account_id"
- "start_date"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.2.49
LABEL io.airbyte.version=0.2.50
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@
"examples": ["2017-01-26T00:00:00Z"],
"type": "string",
"format": "date-time"
},
"insights_lookback_window": {
"title": "Custom Insights Lookback Window",
"description": "The attribution window",
"default": 28,
"maximum": 28,
"mininum": 1,
"exclusiveMinimum": 0,
"type": "integer"
}
},
"required": ["name"]
Expand All @@ -301,6 +310,16 @@
"order": 7,
"exclusiveMinimum": 0,
"type": "integer"
},
"insights_lookback_window": {
"title": "Insights Lookback Window",
"description": "The attribution window",
"default": 28,
"order": 8,
"maximum": 28,
"mininum": 1,
"exclusiveMinimum": 0,
"type": "integer"
}
},
"required": ["account_id", "start_date", "access_token"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
api = API(account_id=config.account_id, access_token=config.access_token)

insights_args = dict(
api=api,
start_date=config.start_date,
end_date=config.end_date,
api=api, start_date=config.start_date, end_date=config.end_date, insights_lookback_window=config.insights_lookback_window
)
streams = [
AdAccount(api=api),
Expand Down Expand Up @@ -159,6 +157,7 @@ def _update_insights_streams(self, insights: List[InsightConfig], default_args,
time_increment=insight.time_increment,
start_date=insight.start_date or default_args["start_date"],
end_date=insight.end_date or default_args["end_date"],
insights_lookback_window=insight.insights_lookback_window or default_args["insights_lookback_window"],
)
insight_stream = AdsInsights(**args)
insights_custom_streams.append(insight_stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ class Config:
pattern=DATE_TIME_PATTERN,
examples=["2017-01-26T00:00:00Z"],
)
insights_lookback_window: Optional[PositiveInt] = Field(
title="Custom Insights Lookback Window",
description="The attribution window",
maximum=28,
mininum=1,
default=28,
vladimir-remar marked this conversation as resolved.
Show resolved Hide resolved
)


class ConnectorConfig(BaseConfig):
Expand Down Expand Up @@ -156,3 +163,12 @@ class Config:
"Page size used when sending requests to Facebook API to specify number of records per page when response has pagination. Most users do not need to set this field unless they specifically need to tune the connector to address specific issues or use cases."
),
)

insights_lookback_window: Optional[PositiveInt] = Field(
title="Insights Lookback Window",
order=8,
description="The attribution window",
maximum=28,
mininum=1,
default=28,
)
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ class AdsInsights(FBMarketingIncrementalStream):
# HTTP response.
# https://developers.facebook.com/docs/marketing-api/reference/ad-account/insights/#overview
INSIGHTS_RETENTION_PERIOD = pendulum.duration(months=37)
# Facebook freezes insight data 28 days after it was generated, which means that all data
# from the past 28 days may have changed since we last emitted it, so we retrieve it again.
INSIGHTS_LOOKBACK_PERIOD = pendulum.duration(days=28)

action_breakdowns = ALL_ACTION_BREAKDOWNS
level = "ad"
Expand All @@ -64,6 +61,7 @@ def __init__(
breakdowns: List[str] = None,
action_breakdowns: List[str] = None,
time_increment: Optional[int] = None,
insights_lookback_window: int = None,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -74,6 +72,7 @@ def __init__(
self.breakdowns = breakdowns or self.breakdowns
self.time_increment = time_increment or self.time_increment
self._new_class_name = name
self._insights_lookback_window = insights_lookback_window

# state
self._cursor_value: Optional[pendulum.Date] = None # latest period that was read
Expand All @@ -91,6 +90,16 @@ def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
"""Build complex PK based on slices and breakdowns"""
return ["date_start", "account_id", "ad_id"] + self.breakdowns

@property
def insights_lookback_period(self):
"""
Facebook freezes insight data 28 days after it was generated, which means that all data
from the past 28 days may have changed since we last emitted it, so we retrieve it again.
But in some cases users my have define their own lookback window, thats
why the value for `insights_lookback_window` is set throught config.
"""
return pendulum.duration(days=self._insights_lookback_window)

def list_objects(self, params: Mapping[str, Any]) -> Iterable:
"""Because insights has very different read_records we don't need this method anymore"""

Expand Down Expand Up @@ -187,7 +196,7 @@ def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]:
"""

today = pendulum.today(tz="UTC").date()
refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD
refresh_date = today - self.insights_lookback_period

for ts_start in self._date_intervals():
if ts_start in self._completed_slices:
Expand Down Expand Up @@ -235,13 +244,12 @@ def _get_start_date(self) -> pendulum.Date:
"""
today = pendulum.today(tz="UTC").date()
oldest_date = today - self.INSIGHTS_RETENTION_PERIOD
refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD

refresh_date = today - self.insights_lookback_period
if self._cursor_value:
start_date = self._cursor_value + pendulum.duration(days=self.time_increment)
if start_date > refresh_date:
logger.info(
f"The cursor value within refresh period ({self.INSIGHTS_LOOKBACK_PERIOD}), start sync from {refresh_date} instead."
f"The cursor value within refresh period ({self.insights_lookback_period}), start sync from {refresh_date} instead."
)
start_date = min(start_date, refresh_date)

Expand All @@ -252,7 +260,6 @@ def _get_start_date(self) -> pendulum.Date:
start_date = self._start_date
if start_date < oldest_date:
logger.warning(f"Loading insights older then {self.INSIGHTS_RETENTION_PERIOD} is not possible. Start sync from {oldest_date}.")

return max(oldest_date, start_date)

def request_params(self, **kwargs) -> MutableMapping[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def async_job_mock_fixture(mocker):

class TestBaseInsightsStream:
def test_init(self, api):
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1))
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), insights_lookback_window=28)

assert not stream.breakdowns
assert stream.action_breakdowns == AdsInsights.ALL_ACTION_BREAKDOWNS
Expand All @@ -66,6 +66,7 @@ def test_init_override(self, api):
name="CustomName",
breakdowns=["test1", "test2"],
action_breakdowns=["field1", "field2"],
insights_lookback_window=28,
)

assert stream.breakdowns == ["test1", "test2"]
Expand All @@ -85,6 +86,7 @@ def test_read_records_all(self, mocker, api):
api=api,
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
insights_lookback_window=28,
)

records = list(
Expand All @@ -104,11 +106,7 @@ def test_read_records_random_order(self, mocker, api):
job = mocker.Mock(spec=AsyncJob)
job.get_result.return_value = [mocker.Mock(), mocker.Mock(), mocker.Mock()]
job.interval = pendulum.Period(pendulum.date(2010, 1, 1), pendulum.date(2010, 1, 1))
stream = AdsInsights(
api=api,
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
)
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), insights_lookback_window=28)

records = list(
stream.read_records(
Expand Down Expand Up @@ -143,11 +141,7 @@ def test_read_records_random_order(self, mocker, api):
)
def test_state(self, api, state):
"""State setter/getter should work with all combinations"""
stream = AdsInsights(
api=api,
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
)
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), insights_lookback_window=28)

assert stream.state == {}

Expand All @@ -162,7 +156,7 @@ def test_state(self, api, state):
def test_stream_slices_no_state(self, api, async_manager_mock, start_date):
"""Stream will use start_date when there is not state"""
end_date = start_date + duration(weeks=2)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]

slices = list(stream.stream_slices(stream_state=None, sync_mode=SyncMode.incremental))
Expand All @@ -179,7 +173,7 @@ def test_stream_slices_no_state_close_to_now(self, api, async_manager_mock, rece
"""Stream will use start_date when there is not state and start_date within 28d from now"""
start_date = recent_start_date
end_date = pendulum.now()
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]

slices = list(stream.stream_slices(stream_state=None, sync_mode=SyncMode.incremental))
Expand All @@ -197,7 +191,7 @@ def test_stream_slices_with_state(self, api, async_manager_mock, start_date):
end_date = start_date + duration(days=10)
cursor_value = start_date + duration(days=5)
state = {AdsInsights.cursor_field: cursor_value.date().isoformat()}
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]

slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental))
Expand All @@ -216,7 +210,7 @@ def test_stream_slices_with_state_close_to_now(self, api, async_manager_mock, re
end_date = pendulum.now()
cursor_value = end_date - duration(days=1)
state = {AdsInsights.cursor_field: cursor_value.date().isoformat()}
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]

slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental))
Expand All @@ -237,7 +231,7 @@ def test_stream_slices_with_state_and_slices(self, api, async_manager_mock, star
AdsInsights.cursor_field: cursor_value.date().isoformat(),
"slices": [(cursor_value + duration(days=1)).date().isoformat(), (cursor_value + duration(days=3)).date().isoformat()],
}
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=28)
async_manager_mock.completed_jobs.return_value = [1, 2, 3]

slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental))
Expand All @@ -251,7 +245,7 @@ def test_stream_slices_with_state_and_slices(self, api, async_manager_mock, star
assert generated_jobs[1].interval.start == cursor_value.date() + duration(days=4)

def test_get_json_schema(self, api):
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1))
stream = AdsInsights(api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), insights_lookback_window=28)

schema = stream.get_json_schema()

Expand All @@ -261,7 +255,11 @@ def test_get_json_schema(self, api):

def test_get_json_schema_custom(self, api):
stream = AdsInsights(
api=api, start_date=datetime(2010, 1, 1), end_date=datetime(2011, 1, 1), breakdowns=["device_platform", "country"]
api=api,
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
breakdowns=["device_platform", "country"],
insights_lookback_window=28,
)

schema = stream.get_json_schema()
Expand All @@ -275,6 +273,7 @@ def test_fields(self, api):
api=api,
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
insights_lookback_window=28,
)

fields = stream.fields
Expand All @@ -289,6 +288,7 @@ def test_fields_custom(self, api):
start_date=datetime(2010, 1, 1),
end_date=datetime(2011, 1, 1),
fields=["account_id", "account_currency"],
insights_lookback_window=28,
)

assert stream.fields == ["account_id", "account_currency"]
Expand All @@ -297,7 +297,7 @@ def test_completed_slices_in_lookback_period(self, api, monkeypatch, set_today):
start_date = pendulum.parse("2020-03-01")
end_date = pendulum.parse("2020-05-01")
set_today("2020-04-01")
monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=10))

monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)

Expand All @@ -311,7 +311,7 @@ def test_completed_slices_in_lookback_period(self, api, monkeypatch, set_today):
"time_increment": 1,
}

stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=10)
stream.state = state
assert stream._completed_slices == {pendulum.Date(2020, 3, 21), pendulum.Date(2020, 3, 22), pendulum.Date(2020, 3, 23)}

Expand All @@ -327,11 +327,11 @@ def test_incremental_lookback_period_updated(self, api, monkeypatch, set_today):
start_date = pendulum.parse("2020-03-01")
end_date = pendulum.parse("2020-05-01")
yesterday, _ = set_today("2020-04-01")
monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=20))

monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)

stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream = AdsInsights(api=api, start_date=start_date, end_date=end_date, insights_lookback_window=20)

records = read_full_refresh(stream)
assert len(records) == (yesterday - start_date).days + 1
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ For more information, see the [Facebook Insights API documentation.](https://dev

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.2.50 | 2022-04-27 | [12402](https://github.com/airbytehq/airbyte/pull/12402) | Add lookback window to insights streams |
| 0.2.49 | 2022-05-20 | [13047](https://github.com/airbytehq/airbyte/pull/13047) | Fix duplicating records during insights lookback period |
| 0.2.48 | 2022-05-19 | [13008](https://github.com/airbytehq/airbyte/pull/13008) | Update CDK to v0.1.58 avoid crashing on incorrect stream schemas |
| 0.2.47 | 2022-05-06 | [12685](https://github.com/airbytehq/airbyte/pull/12685) | Update CDK to v0.1.56 to emit an `AirbyeTraceMessage` on uncaught exceptions |
Expand Down