Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Facebook marketing: Fix duplicating records during insights lookback period #13047

Merged
merged 20 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6353c19
Don't duplicate syncing inside INSIGHTS_LOOKBACK_PERIOD period
grubberr May 20, 2022
638a2ae
test_completed_slices_processed added
grubberr May 20, 2022
5f441d8
fix name test_completed_slices_processed -> test_completed_slices_pro…
grubberr May 20, 2022
3838e0a
facebook-marketing.md updated
grubberr May 20, 2022
4e6a887
Merge branch 'master' into grubberr/oncall-231-source-facebook-marketing
grubberr May 21, 2022
f6cfaaa
max sync - today
grubberr May 21, 2022
0b02887
test_completed_slices_processed_in_lookback_period improved
grubberr May 21, 2022
8edabdb
revert today
grubberr May 21, 2022
1e93615
test_incremental_lookback_period_updated added
grubberr May 21, 2022
b1ce5a0
simplify logic
grubberr May 21, 2022
d445d9c
FakeInsightAsyncJob, FakeInsightAsyncJobManager - extract
grubberr May 21, 2022
9df644c
small improvement of test_completed_slices_processed_in_lookback_period
grubberr May 22, 2022
b3fe9f1
test_completed_slices_in_lookback_period updated
grubberr May 22, 2022
1c962dc
test_incremental_lookback_period_updated - improved
grubberr May 22, 2022
0ca0924
test_incremental_lookback_period_updated improved
grubberr May 22, 2022
2ccb7ab
test_incremental_lookback_period_updated updated
grubberr May 22, 2022
8c188f6
facebook-marketing.md - fixed
grubberr May 22, 2022
58be0f6
Merge branch 'master' into grubberr/oncall-231-source-facebook-marketing
grubberr May 23, 2022
23f38b1
skip reading today data
grubberr May 23, 2022
22ac460
auto-bump connector version
octavia-squidington-iii May 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.2.48
LABEL io.airbyte.version=0.2.49
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,20 @@ def read_records(
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""Waits for current job to finish (slice) and yield its result"""

today = pendulum.today(tz="UTC").date()
date_start = stream_state and stream_state.get("date_start")
if date_start:
date_start = pendulum.parse(date_start).date()

job = stream_slice["insight_job"]
for obj in job.get_result():
yield obj.export_all_data()
record = obj.export_all_data()
if date_start:
updated_time = pendulum.parse(record["updated_time"]).date()
if updated_time <= date_start or updated_time >= today:
continue
yield record

self._completed_slices.add(job.interval.start)
if job.interval.start == self._next_cursor_value:
Expand Down Expand Up @@ -152,9 +163,11 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late

def _date_intervals(self) -> Iterator[pendulum.Date]:
"""Get date period to sync"""
if self._end_date < self._next_cursor_value:
yesterday = pendulum.yesterday(tz="UTC").date()
end_date = min(self._end_date, yesterday)
if end_date < self._next_cursor_value:
return
date_range = self._end_date - self._next_cursor_value
date_range = end_date - self._next_cursor_value
yield from date_range.range("days", self.time_increment)

def _advance_cursor(self):
Expand All @@ -173,9 +186,14 @@ def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]:
:return:
"""

today = pendulum.today(tz="UTC").date()
refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD

for ts_start in self._date_intervals():
if ts_start in self._completed_slices:
continue
if ts_start < refresh_date:
continue
self._completed_slices.remove(ts_start)
ts_end = ts_start + pendulum.duration(days=self.time_increment - 1)
interval = pendulum.Period(ts_start, ts_end)
yield InsightAsyncJob(api=self._api.api, edge_object=self._api.account, interval=interval, params=params)
Expand Down Expand Up @@ -215,7 +233,7 @@ def _get_start_date(self) -> pendulum.Date:
:return: the first date to sync
"""
today = pendulum.today().date()
today = pendulum.today(tz="UTC").date()
oldest_date = today - self.INSIGHTS_RETENTION_PERIOD
refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,15 @@ def api_fixture(some_config, requests_mock, fb_account_response):
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/me/adaccounts", [fb_account_response])
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{some_config['account_id']}/", [fb_account_response])
return api


@fixture
def set_today(mocker, monkeypatch):
def inner(date: str):
today = pendulum.parse(date)
yesterday = today - pendulum.duration(days=1)
monkeypatch.setattr(pendulum, "today", mocker.MagicMock(return_value=today))
monkeypatch.setattr(pendulum, "yesterday", mocker.MagicMock(return_value=yesterday))
return yesterday, today

return inner
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


from typing import Any, MutableMapping

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream


def read_full_refresh(stream_instance: Stream):
records = []
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
for slice in slices:
records.extend(list(stream_instance.read_records(stream_slice=slice, sync_mode=SyncMode.full_refresh)))
return records


def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any]):
records = []
stream_instance.state = stream_state
slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state)
for slice in slices:
records.extend(list(stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=stream_state)))
stream_state.clear()
stream_state.update(stream_instance.state)
return records


class FakeInsightAsyncJobManager:
def __init__(self, jobs, **kwargs):
self.jobs = jobs

def completed_jobs(self):
yield from self.jobs


class FakeInsightAsyncJob:
updated_insights = {}

@classmethod
def update_insight(cls, date_start, updated_time):
cls.updated_insights[date_start] = updated_time

def __init__(self, interval, **kwargs):
self.interval = interval

def get_result(self):
return [self]

def export_all_data(self):
date_start = str(self.interval.start)
return {"date_start": date_start, "updated_time": self.updated_insights.get(date_start, date_start)}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import pendulum
import pytest
import source_facebook_marketing.streams.base_insight_streams
from airbyte_cdk.models import SyncMode
from helpers import FakeInsightAsyncJob, FakeInsightAsyncJobManager, read_full_refresh, read_incremental
from pendulum import duration
from source_facebook_marketing.streams import AdsInsights
from source_facebook_marketing.streams.async_job import AsyncJob, InsightAsyncJob
Expand Down Expand Up @@ -186,7 +188,7 @@ def test_stream_slices_no_state_close_to_now(self, api, async_manager_mock, rece
async_manager_mock.assert_called_once()
args, kwargs = async_manager_mock.call_args
generated_jobs = list(kwargs["jobs"])
assert len(generated_jobs) == (end_date - start_date).days + 1
assert len(generated_jobs) == (end_date - start_date).days
assert generated_jobs[0].interval.start == start_date.date()
assert generated_jobs[1].interval.start == start_date.date() + duration(days=1)

Expand Down Expand Up @@ -223,7 +225,7 @@ def test_stream_slices_with_state_close_to_now(self, api, async_manager_mock, re
async_manager_mock.assert_called_once()
args, kwargs = async_manager_mock.call_args
generated_jobs = list(kwargs["jobs"])
assert len(generated_jobs) == (end_date - start_date).days + 1
assert len(generated_jobs) == (end_date - start_date).days
assert generated_jobs[0].interval.start == start_date.date()
assert generated_jobs[1].interval.start == start_date.date() + duration(days=1)

Expand Down Expand Up @@ -290,3 +292,72 @@ def test_fields_custom(self, api):
)

assert stream.fields == ["account_id", "account_currency"]

def test_completed_slices_in_lookback_period(self, api, monkeypatch, set_today):
start_date = pendulum.parse("2020-03-01")
end_date = pendulum.parse("2020-05-01")
set_today("2020-04-01")
monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=10))
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)

state = {
AdsInsights.cursor_field: "2020-03-19",
"slices": [
"2020-03-21",
"2020-03-22",
"2020-03-23",
],
"time_increment": 1,
}

stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)
stream.state = state
assert stream._completed_slices == {pendulum.Date(2020, 3, 21), pendulum.Date(2020, 3, 22), pendulum.Date(2020, 3, 23)}

slices = stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental)
slices = [x["insight_job"].interval.start for x in slices]

assert pendulum.parse("2020-03-21").date() not in slices
assert pendulum.parse("2020-03-22").date() in slices
assert pendulum.parse("2020-03-23").date() in slices
assert stream._completed_slices == {pendulum.Date(2020, 3, 21)}

def test_incremental_lookback_period_updated(self, api, monkeypatch, set_today):
start_date = pendulum.parse("2020-03-01")
end_date = pendulum.parse("2020-05-01")
yesterday, _ = set_today("2020-04-01")
monkeypatch.setattr(AdsInsights, "INSIGHTS_LOOKBACK_PERIOD", pendulum.duration(days=20))
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJob", FakeInsightAsyncJob)
monkeypatch.setattr(source_facebook_marketing.streams.base_insight_streams, "InsightAsyncJobManager", FakeInsightAsyncJobManager)

stream = AdsInsights(api=api, start_date=start_date, end_date=end_date)

records = read_full_refresh(stream)
assert len(records) == (yesterday - start_date).days + 1
assert records[0]["date_start"] == str(start_date.date())
assert records[-1]["date_start"] == str(yesterday.date())

state = {AdsInsights.cursor_field: "2020-03-20", "time_increment": 1}
records = read_incremental(stream, state)
assert len(records) == (yesterday - pendulum.parse("2020-03-20")).days
assert records[0]["date_start"] == "2020-03-21"
assert records[-1]["date_start"] == str(yesterday.date())
assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1}

yesterday, _ = set_today("2020-04-02")
records = read_incremental(stream, state)
assert records == [{"date_start": str(yesterday.date()), "updated_time": str(yesterday.date())}]
assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1}

yesterday, _ = set_today("2020-04-03")
FakeInsightAsyncJob.update_insight("2020-03-26", "2020-04-01")
FakeInsightAsyncJob.update_insight("2020-03-27", "2020-04-02")
FakeInsightAsyncJob.update_insight("2020-03-28", "2020-04-03")

records = read_incremental(stream, state)
assert records == [
{"date_start": "2020-03-27", "updated_time": "2020-04-02"},
{"date_start": "2020-04-02", "updated_time": "2020-04-02"},
]
assert state == {"date_start": str(yesterday.date()), "slices": [], "time_increment": 1}
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ For more information, see the [Facebook Insights API documentation.](https://dev

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.2.49 | 2022-05-20 | [13047](https://github.com/airbytehq/airbyte/pull/13047) | Fix duplicating records during insights lookback period |
| 0.2.48 | 2022-05-19 | [13008](https://github.com/airbytehq/airbyte/pull/13008) | Update CDK to v0.1.58 avoid crashing on incorrect stream schemas |
| 0.2.47 | 2022-05-06 | [12685](https://github.com/airbytehq/airbyte/pull/12685) | Update CDK to v0.1.56 to emit an `AirbyeTraceMessage` on uncaught exceptions |
| 0.2.46 | 2022-04-22 | [12171](https://github.com/airbytehq/airbyte/pull/12171) | Allow configuration of page_size for requests |
Expand Down